Split double write task
This commit is contained in:
parent
40dec33ff6
commit
8f94646bee
@ -521,6 +521,8 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
Loggers.SRV_LOG.info("cluster size, new: {}, old: {}", getClusterMap().size(), vDom.getClusterMap().size());
|
||||
|
||||
recalculateChecksum();
|
||||
ApplicationUtils.getBean(DoubleWriteEventListener.class)
|
||||
.doubleWriteMetadataToV2(this, vDom.allIPs(false).isEmpty());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -61,8 +61,8 @@ public abstract class AbstractClient implements Client {
|
||||
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
|
||||
if (null == publishers.put(service, instancePublishInfo)) {
|
||||
MetricsMonitor.getIpCountMonitor().incrementAndGet();
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
|
||||
}
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
|
||||
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
|
||||
return true;
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener
|
||||
public boolean clientConnected(Client client) {
|
||||
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
|
||||
if (!clients.containsKey(client.getClientId())) {
|
||||
clients.put(client.getClientId(), (ConnectionBasedClient) client);
|
||||
clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ public class EphemeralIpPortClientManager implements ClientManager {
|
||||
public boolean clientConnected(Client client) {
|
||||
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
|
||||
if (!clients.containsKey(client.getClientId())) {
|
||||
clients.put(client.getClientId(), (IpPortBasedClient) client);
|
||||
clients.putIfAbsent(client.getClientId(), (IpPortBasedClient) client);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ public class PersistentIpPortClientManager implements ClientManager {
|
||||
public boolean clientConnected(Client client) {
|
||||
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
|
||||
if (!clients.containsKey(client.getClientId())) {
|
||||
clients.put(client.getClientId(), (IpPortBasedClient) client);
|
||||
clients.putIfAbsent(client.getClientId(), (IpPortBasedClient) client);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -79,8 +79,16 @@ public class PersistentIpPortClientManager implements ClientManager {
|
||||
return clients.containsKey(clientId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute and do operation new client when client not exist.
|
||||
*
|
||||
* @param clientId clientId
|
||||
* @param supplier operation
|
||||
* @return Client saved in manager
|
||||
*/
|
||||
public Client computeIfAbsent(final String clientId, final Supplier<IpPortBasedClient> supplier) {
|
||||
clients.computeIfAbsent(clientId, s -> supplier.get());
|
||||
Loggers.SRV_LOG.info("Client connection {} connect", clientId);
|
||||
return getClient(clientId);
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,9 @@ import com.alibaba.nacos.core.utils.Loggers;
|
||||
import com.alibaba.nacos.naming.core.v2.ServiceManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteEventListener;
|
||||
import com.alibaba.nacos.naming.utils.Constants;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
import org.apache.commons.lang3.reflect.TypeUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -119,6 +121,7 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP {
|
||||
Service singleton = ServiceManager.getInstance().getSingleton(service);
|
||||
namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata());
|
||||
}
|
||||
doubleWriteMetadata(service);
|
||||
}
|
||||
|
||||
private void updateServiceMetadata(MetadataOperation<ServiceMetadata> op) {
|
||||
@ -133,6 +136,17 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP {
|
||||
Service singleton = ServiceManager.getInstance().getSingleton(service);
|
||||
namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata());
|
||||
}
|
||||
doubleWriteMetadata(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Only for downgrade to v1.x.
|
||||
*
|
||||
* @param service double write service
|
||||
* @deprecated will remove in v2.1.x
|
||||
*/
|
||||
private void doubleWriteMetadata(Service service) {
|
||||
ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteMetadataToV1(service);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -30,8 +30,11 @@ import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
|
||||
import com.alibaba.nacos.naming.core.ServiceManager;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.RefreshStorageDataTask;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
|
||||
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import org.codehaus.jackson.Version;
|
||||
@ -185,6 +188,18 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
|
||||
Loggers.SRV_LOG.info("Upgrade to 2.0.X");
|
||||
useGrpcFeatures.compareAndSet(false, true);
|
||||
useJraftFeatures.set(true);
|
||||
refreshPersistentServices();
|
||||
}
|
||||
|
||||
private void refreshPersistentServices() {
|
||||
for (String each : com.alibaba.nacos.naming.core.v2.ServiceManager.getInstance().getAllNamespaces()) {
|
||||
for (Service service : com.alibaba.nacos.naming.core.v2.ServiceManager.getInstance().getSingletons(each)) {
|
||||
if (!service.isEphemeral()) {
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(service, new RefreshStorageDataTask(service));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite;
|
||||
|
||||
import com.alibaba.nacos.common.task.AbstractExecuteTask;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
|
||||
/**
|
||||
* Refresh service storage cache data when upgrading.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class RefreshStorageDataTask extends AbstractExecuteTask {
|
||||
|
||||
private final Service service;
|
||||
|
||||
public RefreshStorageDataTask(Service service) {
|
||||
this.service = service;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ApplicationUtils.getBean(ServiceStorage.class).getPushData(service);
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay;
|
||||
|
||||
/**
|
||||
* Double write content.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public enum DoubleWriteContent {
|
||||
|
||||
/**
|
||||
* Only write metadata.
|
||||
*/
|
||||
METADATA,
|
||||
/**
|
||||
* Only write instance.
|
||||
*/
|
||||
INSTANCE,
|
||||
/**
|
||||
* Both of content.
|
||||
*/
|
||||
BOTH;
|
||||
}
|
@ -49,7 +49,7 @@ public class DoubleWriteEventListener extends Subscriber<ServiceEvent.ServiceCha
|
||||
return;
|
||||
}
|
||||
String taskKey = ServiceChangeV2Task.getKey(event.getService());
|
||||
ServiceChangeV2Task task = new ServiceChangeV2Task(event.getService());
|
||||
ServiceChangeV2Task task = new ServiceChangeV2Task(event.getService(), DoubleWriteContent.INSTANCE);
|
||||
doubleWriteDelayTaskEngine.addTask(taskKey, task);
|
||||
}
|
||||
|
||||
@ -58,6 +58,19 @@ public class DoubleWriteEventListener extends Subscriber<ServiceEvent.ServiceCha
|
||||
return ServiceEvent.ServiceChangedEvent.class;
|
||||
}
|
||||
|
||||
/**
|
||||
* Double write service metadata from v2 to v1.
|
||||
*
|
||||
* @param service service for v2
|
||||
*/
|
||||
public void doubleWriteMetadataToV1(com.alibaba.nacos.naming.core.v2.pojo.Service service) {
|
||||
if (!upgradeJudgement.isUseGrpcFeatures()) {
|
||||
return;
|
||||
}
|
||||
doubleWriteDelayTaskEngine.addTask(ServiceChangeV2Task.getKey(service),
|
||||
new ServiceChangeV2Task(service, DoubleWriteContent.METADATA));
|
||||
}
|
||||
|
||||
/**
|
||||
* Double write service from v1 to v2.
|
||||
*
|
||||
@ -71,6 +84,22 @@ public class DoubleWriteEventListener extends Subscriber<ServiceEvent.ServiceCha
|
||||
String namespace = service.getNamespaceId();
|
||||
String serviceName = service.getName();
|
||||
doubleWriteDelayTaskEngine.addTask(ServiceChangeV1Task.getKey(namespace, serviceName, ephemeral),
|
||||
new ServiceChangeV1Task(namespace, serviceName, ephemeral));
|
||||
new ServiceChangeV1Task(namespace, serviceName, ephemeral, DoubleWriteContent.INSTANCE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Double write service metadata from v1 to v2.
|
||||
*
|
||||
* @param service service for v1
|
||||
* @param ephemeral ephemeral of service
|
||||
*/
|
||||
public void doubleWriteMetadataToV2(Service service, boolean ephemeral) {
|
||||
if (upgradeJudgement.isUseGrpcFeatures()) {
|
||||
return;
|
||||
}
|
||||
String namespace = service.getNamespaceId();
|
||||
String serviceName = service.getName();
|
||||
doubleWriteDelayTaskEngine.addTask(ServiceChangeV1Task.getKey(namespace, serviceName, ephemeral),
|
||||
new ServiceChangeV1Task(namespace, serviceName, ephemeral, DoubleWriteContent.METADATA));
|
||||
}
|
||||
}
|
||||
|
@ -53,16 +53,26 @@ public class ServiceChangeV1Task extends AbstractDelayTask {
|
||||
|
||||
private final boolean ephemeral;
|
||||
|
||||
public ServiceChangeV1Task(String namespace, String serviceName, boolean ephemeral) {
|
||||
private DoubleWriteContent content;
|
||||
|
||||
public ServiceChangeV1Task(String namespace, String serviceName, boolean ephemeral, DoubleWriteContent content) {
|
||||
this.namespace = namespace;
|
||||
this.serviceName = serviceName;
|
||||
this.ephemeral = ephemeral;
|
||||
this.content = content;
|
||||
setLastProcessTime(System.currentTimeMillis());
|
||||
setTaskInterval(1000L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(AbstractDelayTask task) {
|
||||
if (!(task instanceof ServiceChangeV1Task)) {
|
||||
return;
|
||||
}
|
||||
ServiceChangeV1Task oldTask = (ServiceChangeV1Task) task;
|
||||
if (!content.equals(oldTask.getContent())) {
|
||||
content = DoubleWriteContent.BOTH;
|
||||
}
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
@ -77,6 +87,10 @@ public class ServiceChangeV1Task extends AbstractDelayTask {
|
||||
return ephemeral;
|
||||
}
|
||||
|
||||
public DoubleWriteContent getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public static String getKey(String namespace, String serviceName, boolean ephemeral) {
|
||||
return "v1:" + namespace + "_" + serviceName + "_" + ephemeral;
|
||||
}
|
||||
@ -86,34 +100,51 @@ public class ServiceChangeV1Task extends AbstractDelayTask {
|
||||
@Override
|
||||
public boolean process(NacosTask task) {
|
||||
ServiceChangeV1Task serviceTask = (ServiceChangeV1Task) task;
|
||||
Loggers.SRV_LOG.info("double write for service {}", serviceTask.getServiceName());
|
||||
Loggers.SRV_LOG.info("double write for service {}, content {}", serviceTask.getServiceName(),
|
||||
serviceTask.getContent());
|
||||
ServiceManager serviceManager = ApplicationUtils.getBean(ServiceManager.class);
|
||||
Service service = serviceManager.getService(serviceTask.getNamespace(), serviceTask.getServiceName());
|
||||
ServiceMetadata serviceMetadata = parseServiceMetadata(service, serviceTask.isEphemeral());
|
||||
DoubleWriteMetadataChangeToV2Task metadataTask = new DoubleWriteMetadataChangeToV2Task(
|
||||
service.getNamespaceId(), service.getName(), serviceTask.isEphemeral(), serviceMetadata);
|
||||
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service.getName(), metadataTask);
|
||||
switch (serviceTask.getContent()) {
|
||||
case METADATA:
|
||||
dispatchMetadataTask(service, serviceTask.isEphemeral());
|
||||
break;
|
||||
case INSTANCE:
|
||||
dispatchInstanceTask(service, serviceTask.isEphemeral());
|
||||
break;
|
||||
default:
|
||||
dispatchAllTask(service, serviceTask.isEphemeral());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void dispatchAllTask(Service service, boolean ephemeral) {
|
||||
dispatchMetadataTask(service, ephemeral);
|
||||
dispatchInstanceTask(service, ephemeral);
|
||||
}
|
||||
|
||||
private void dispatchInstanceTask(Service service, boolean ephemeral) {
|
||||
ServiceStorage serviceStorage = ApplicationUtils.getBean(ServiceStorage.class);
|
||||
ServiceInfo serviceInfo = serviceStorage.getPushData(transfer(service, serviceTask.isEphemeral()));
|
||||
List<Instance> newInstance = service.allIPs(serviceTask.isEphemeral());
|
||||
ServiceInfo serviceInfo = serviceStorage.getPushData(transfer(service, ephemeral));
|
||||
List<Instance> newInstance = service.allIPs(ephemeral);
|
||||
Set<String> instances = new HashSet<>();
|
||||
for (Instance each : newInstance) {
|
||||
instances.add(each.toIpAddr());
|
||||
DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task(
|
||||
service.getNamespaceId(), service.getName(), each, true);
|
||||
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(
|
||||
IpPortBasedClient.getClientId(each.toIpAddr(), serviceTask.isEphemeral()), instanceTask);
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(IpPortBasedClient.getClientId(each.toIpAddr(), ephemeral),
|
||||
instanceTask);
|
||||
}
|
||||
List<com.alibaba.nacos.api.naming.pojo.Instance> oldInstance = serviceInfo.getHosts();
|
||||
for (com.alibaba.nacos.api.naming.pojo.Instance each : oldInstance) {
|
||||
if (!instances.contains(each.toInetAddr())) {
|
||||
DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task(
|
||||
service.getNamespaceId(), service.getName(), each, false);
|
||||
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(
|
||||
IpPortBasedClient.getClientId(each.toInetAddr(), serviceTask.isEphemeral()), instanceTask);
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(IpPortBasedClient.getClientId(each.toInetAddr(), ephemeral),
|
||||
instanceTask);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private com.alibaba.nacos.naming.core.v2.pojo.Service transfer(Service service, boolean ephemeral) {
|
||||
@ -122,6 +153,13 @@ public class ServiceChangeV1Task extends AbstractDelayTask {
|
||||
NamingUtils.getServiceName(service.getName()), ephemeral);
|
||||
}
|
||||
|
||||
private void dispatchMetadataTask(Service service, boolean ephemeral) {
|
||||
ServiceMetadata serviceMetadata = parseServiceMetadata(service, ephemeral);
|
||||
DoubleWriteMetadataChangeToV2Task metadataTask = new DoubleWriteMetadataChangeToV2Task(
|
||||
service.getNamespaceId(), service.getName(), ephemeral, serviceMetadata);
|
||||
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service.getName(), metadataTask);
|
||||
}
|
||||
|
||||
private ServiceMetadata parseServiceMetadata(Service service, boolean ephemeral) {
|
||||
ServiceMetadata result = new ServiceMetadata();
|
||||
result.setEphemeral(ephemeral);
|
||||
|
@ -34,16 +34,32 @@ public class ServiceChangeV2Task extends AbstractDelayTask {
|
||||
|
||||
private final Service changedService;
|
||||
|
||||
public ServiceChangeV2Task(Service service) {
|
||||
private DoubleWriteContent content;
|
||||
|
||||
public ServiceChangeV2Task(Service service, DoubleWriteContent content) {
|
||||
changedService = service;
|
||||
this.content = content;
|
||||
setTaskInterval(1000L);
|
||||
setLastProcessTime(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public Service getChangedService() {
|
||||
return changedService;
|
||||
}
|
||||
|
||||
public DoubleWriteContent getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(AbstractDelayTask task) {
|
||||
if (!(task instanceof ServiceChangeV2Task)) {
|
||||
return;
|
||||
}
|
||||
ServiceChangeV2Task oldTask = (ServiceChangeV2Task) task;
|
||||
if (!content.equals(oldTask.getContent())) {
|
||||
content = DoubleWriteContent.BOTH;
|
||||
}
|
||||
}
|
||||
|
||||
public static String getKey(Service service) {
|
||||
@ -55,15 +71,36 @@ public class ServiceChangeV2Task extends AbstractDelayTask {
|
||||
@Override
|
||||
public boolean process(NacosTask task) {
|
||||
ServiceChangeV2Task serviceTask = (ServiceChangeV2Task) task;
|
||||
Loggers.SRV_LOG.info("double write for service {}", serviceTask.getChangedService());
|
||||
Service changedService = serviceTask.getChangedService();
|
||||
DoubleWriteMetadataChangeToV1Task metadataTask = new DoubleWriteMetadataChangeToV1Task(changedService);
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(changedService.getGroupedServiceName(), metadataTask);
|
||||
Loggers.SRV_LOG.info("double write for service {}, content {}", changedService, serviceTask.getContent());
|
||||
switch (serviceTask.getContent()) {
|
||||
case INSTANCE:
|
||||
dispatchInstanceChangeTask(changedService);
|
||||
break;
|
||||
case METADATA:
|
||||
dispatchMetadataChangeTask(changedService);
|
||||
break;
|
||||
default:
|
||||
dispatchAllTask(changedService);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void dispatchInstanceChangeTask(Service changedService) {
|
||||
DoubleWriteInstanceChangeToV1Task instanceTask = new DoubleWriteInstanceChangeToV1Task(changedService);
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(changedService.getGroupedServiceName(), instanceTask);
|
||||
return true;
|
||||
}
|
||||
|
||||
private void dispatchMetadataChangeTask(Service changedService) {
|
||||
DoubleWriteMetadataChangeToV1Task metadataTask = new DoubleWriteMetadataChangeToV1Task(changedService);
|
||||
NamingExecuteTaskDispatcher.getInstance()
|
||||
.dispatchAndExecuteTask(changedService.getGroupedServiceName(), metadataTask);
|
||||
}
|
||||
|
||||
private void dispatchAllTask(Service changedService) {
|
||||
dispatchMetadataChangeTask(changedService);
|
||||
dispatchInstanceChangeTask(changedService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,9 @@ import com.alibaba.nacos.naming.core.Instances;
|
||||
import com.alibaba.nacos.naming.core.ServiceManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV2Task;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
|
||||
@ -60,6 +63,10 @@ public class DoubleWriteInstanceChangeToV1Task extends AbstractExecuteTask {
|
||||
if (Loggers.SRV_LOG.isDebugEnabled()) {
|
||||
Loggers.SRV_LOG.debug("Double write task for {} instance from 2 to 1 failed", service, e);
|
||||
}
|
||||
ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service, DoubleWriteContent.INSTANCE);
|
||||
retryTask.setTaskInterval(3000L);
|
||||
String taskKey = ServiceChangeV2Task.getKey(service);
|
||||
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.common.task.AbstractExecuteTask;
|
||||
import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
@ -61,7 +62,8 @@ public class DoubleWriteInstanceChangeToV2Task extends AbstractExecuteTask {
|
||||
Loggers.SRV_LOG
|
||||
.debug("Double write task for {}#{} instance from 1 to 2 failed", namespace, serviceName, e);
|
||||
}
|
||||
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral());
|
||||
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral(),
|
||||
DoubleWriteContent.INSTANCE);
|
||||
retryTask.setTaskInterval(3000L);
|
||||
String taskKey = ServiceChangeV1Task.getKey(namespace, serviceName, instance.isEphemeral());
|
||||
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
|
||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV2Task;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
@ -59,7 +60,7 @@ public class DoubleWriteMetadataChangeToV1Task extends AbstractExecuteTask {
|
||||
if (Loggers.SRV_LOG.isDebugEnabled()) {
|
||||
Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e);
|
||||
}
|
||||
ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service);
|
||||
ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service, DoubleWriteContent.METADATA);
|
||||
retryTask.setTaskInterval(3000L);
|
||||
String taskKey = ServiceChangeV2Task.getKey(service);
|
||||
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
|
||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteContent;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
@ -67,7 +68,7 @@ public class DoubleWriteMetadataChangeToV2Task extends AbstractExecuteTask {
|
||||
Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e);
|
||||
}
|
||||
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(service.getNamespace(),
|
||||
service.getGroupedServiceName(), service.isEphemeral());
|
||||
service.getGroupedServiceName(), service.isEphemeral(), DoubleWriteContent.METADATA);
|
||||
retryTask.setTaskInterval(3000L);
|
||||
String taskKey = ServiceChangeV1Task
|
||||
.getKey(service.getNamespace(), service.getGroupedServiceName(), service.isEphemeral());
|
||||
|
@ -76,7 +76,7 @@ public class UpgradeJudgementTest {
|
||||
public void setUp() throws Exception {
|
||||
EnvUtil.setEnvironment(environment);
|
||||
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
|
||||
doubleWriteDelayTaskEngine);
|
||||
doubleWriteDelayTaskEngine, serviceStorage);
|
||||
}
|
||||
|
||||
@After
|
||||
|
Loading…
Reference in New Issue
Block a user