Add double write logic for downgrade

This commit is contained in:
KomachiSion 2021-01-26 15:47:02 +08:00
parent cecf0c0893
commit 40dec33ff6
10 changed files with 368 additions and 10 deletions

View File

@ -21,8 +21,7 @@ import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteEventListener;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
@ -277,9 +276,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class)
.addTask(ServiceChangeV1Task.getKey(namespaceId, getName(), ephemeral),
new ServiceChangeV1Task(namespaceId, getName(), ephemeral));
ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {

View File

@ -33,6 +33,7 @@ import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.util.VersionUtil;
import org.springframework.stereotype.Component;
@ -71,7 +72,7 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
private final DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine;
private final ScheduledExecutorService upgradeChecker;
private ScheduledExecutorService upgradeChecker;
public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersionJudgement versionJudgement,
ServerMemberManager memberManager, ServiceManager serviceManager,
@ -82,6 +83,15 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
this.memberManager = memberManager;
this.serviceManager = serviceManager;
this.doubleWriteDelayTaskEngine = doubleWriteDelayTaskEngine;
if (!EnvUtil.getStandaloneMode()) {
initUpgradeChecker();
} else {
useGrpcFeatures.set(true);
}
NotifyCenter.registerSubscriber(this);
}
private void initUpgradeChecker() {
upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker"));
upgradeChecker.scheduleAtFixedRate(() -> {
if (isUseGrpcFeatures()) {
@ -93,7 +103,6 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
doUpgrade();
}
}, 100L, 5000L, TimeUnit.MILLISECONDS);
NotifyCenter.registerSubscriber(this);
}
@JustForTest
@ -183,7 +192,12 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
return MembersChangeEvent.class;
}
/**
* Shut down.
*/
public void shutdown() {
if (null != upgradeChecker) {
upgradeChecker.shutdownNow();
}
}
}

View File

@ -32,6 +32,7 @@ public class DoubleWriteDelayTaskEngine extends NacosDelayTaskExecuteEngine {
public DoubleWriteDelayTaskEngine() {
super(DoubleWriteDelayTaskEngine.class.getSimpleName(), Loggers.SRV_LOG);
addProcessor("v1", new ServiceChangeV1Task.ServiceChangeV1TaskProcessor());
addProcessor("v2", new ServiceChangeV2Task.ServiceChangeV2TaskProcessor());
}
@Override

View File

@ -0,0 +1,76 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import org.springframework.stereotype.Component;
/**
* Event listener for double write.
*
* @author xiweng.yy
*/
@Component
public class DoubleWriteEventListener extends Subscriber<ServiceEvent.ServiceChangedEvent> {
private final UpgradeJudgement upgradeJudgement;
private final DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine;
public DoubleWriteEventListener(UpgradeJudgement upgradeJudgement,
DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine) {
this.upgradeJudgement = upgradeJudgement;
this.doubleWriteDelayTaskEngine = doubleWriteDelayTaskEngine;
NotifyCenter.registerSubscriber(this);
}
@Override
public void onEvent(ServiceEvent.ServiceChangedEvent event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
String taskKey = ServiceChangeV2Task.getKey(event.getService());
ServiceChangeV2Task task = new ServiceChangeV2Task(event.getService());
doubleWriteDelayTaskEngine.addTask(taskKey, task);
}
@Override
public Class<? extends Event> subscribeType() {
return ServiceEvent.ServiceChangedEvent.class;
}
/**
* Double write service from v1 to v2.
*
* @param service service for v1
* @param ephemeral ephemeral of service
*/
public void doubleWriteToV2(Service service, boolean ephemeral) {
if (upgradeJudgement.isUseGrpcFeatures()) {
return;
}
String namespace = service.getNamespaceId();
String serviceName = service.getName();
doubleWriteDelayTaskEngine.addTask(ServiceChangeV1Task.getKey(namespace, serviceName, ephemeral),
new ServiceChangeV1Task(namespace, serviceName, ephemeral));
}
}

View File

@ -41,7 +41,7 @@ import java.util.Map;
import java.util.Set;
/**
* Double write delay task during upgrading.
* Double write delay task for service from v1 to v2 during upgrading.
*
* @author xiweng.yy
*/
@ -105,7 +105,6 @@ public class ServiceChangeV1Task extends AbstractDelayTask {
IpPortBasedClient.getClientId(each.toIpAddr(), serviceTask.isEphemeral()), instanceTask);
}
List<com.alibaba.nacos.api.naming.pojo.Instance> oldInstance = serviceInfo.getHosts();
Loggers.SRV_LOG.info("[TMP_DEBUG] oldInstances {} \n new Instances {}", oldInstance, newInstance);
for (com.alibaba.nacos.api.naming.pojo.Instance each : oldInstance) {
if (!instances.contains(each.toInetAddr())) {
DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task(

View File

@ -0,0 +1,69 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteInstanceChangeToV1Task;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteMetadataChangeToV1Task;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
/**
* Double write delay task for service from v2 to v1 during downgrading.
*
* @author xiweng.yy
*/
public class ServiceChangeV2Task extends AbstractDelayTask {
private final Service changedService;
public ServiceChangeV2Task(Service service) {
changedService = service;
}
public Service getChangedService() {
return changedService;
}
@Override
public void merge(AbstractDelayTask task) {
}
public static String getKey(Service service) {
return "v2:" + service.getNamespace() + "_" + service.getGroupedServiceName() + "_" + service.isEphemeral();
}
public static class ServiceChangeV2TaskProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
ServiceChangeV2Task serviceTask = (ServiceChangeV2Task) task;
Loggers.SRV_LOG.info("double write for service {}", serviceTask.getChangedService());
Service changedService = serviceTask.getChangedService();
DoubleWriteMetadataChangeToV1Task metadataTask = new DoubleWriteMetadataChangeToV1Task(changedService);
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(changedService.getGroupedServiceName(), metadataTask);
DoubleWriteInstanceChangeToV1Task instanceTask = new DoubleWriteInstanceChangeToV1Task(changedService);
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(changedService.getGroupedServiceName(), instanceTask);
return true;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
/**
* Double write from 2.x to 1 task.
*
* @author xiweng.yy
*/
public class DoubleWriteInstanceChangeToV1Task extends AbstractExecuteTask {
private final Service service;
public DoubleWriteInstanceChangeToV1Task(Service service) {
this.service = service;
}
@Override
public void run() {
try {
ServiceManager serviceManager = ApplicationUtils.getBean(ServiceManager.class);
com.alibaba.nacos.naming.core.Service serviceV1 = serviceManager
.getService(service.getNamespace(), service.getGroupedServiceName());
if (null == serviceV1) {
serviceManager.createEmptyService(service.getNamespace(), service.getGroupedServiceName(),
service.isEphemeral());
}
Instances newInstances = getNewInstances();
String key = KeyBuilder.buildInstanceListKey(service.getNamespace(), service.getGroupedServiceName(),
service.isEphemeral());
ConsistencyService consistencyService = ApplicationUtils
.getBean("consistencyDelegate", ConsistencyService.class);
consistencyService.put(key, newInstances);
} catch (Exception e) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("Double write task for {} instance from 2 to 1 failed", service, e);
}
}
}
private Instances getNewInstances() {
Instances result = new Instances();
ServiceStorage serviceStorage = ApplicationUtils.getBean(ServiceStorage.class);
long currentTimeStamp = System.currentTimeMillis();
for (Instance each : serviceStorage.getData(service).getHosts()) {
com.alibaba.nacos.naming.core.Instance instance = new com.alibaba.nacos.naming.core.Instance();
instance.setIp(each.getIp());
instance.setPort(each.getPort());
instance.setClusterName(each.getClusterName());
instance.setHealthy(each.isHealthy());
instance.setEphemeral(each.isEphemeral());
instance.setWeight(each.getWeight());
instance.setMetadata(each.getMetadata());
instance.setEnabled(each.isEnabled());
instance.setServiceName(each.getServiceName());
instance.setLastBeat(currentTimeStamp);
result.getInstanceList().add(instance);
}
return result;
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
/**
@ -56,6 +57,10 @@ public class DoubleWriteInstanceChangeToV2Task extends AbstractExecuteTask {
instanceOperator.removeInstance(namespace, serviceName, instance);
}
} catch (Exception e) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG
.debug("Double write task for {}#{} instance from 1 to 2 failed", namespace, serviceName, e);
}
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral());
retryTask.setTaskInterval(3000L);
String taskKey = ServiceChangeV1Task.getKey(namespace, serviceName, instance.isEphemeral());

View File

@ -0,0 +1,107 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV2Task;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.util.Map;
import java.util.Optional;
/**
* Double write from 2.x to 1 task.
*
* @author xiweng.yy
*/
public class DoubleWriteMetadataChangeToV1Task extends AbstractExecuteTask {
private final Service service;
public DoubleWriteMetadataChangeToV1Task(Service service) {
this.service = service;
}
@Override
public void run() {
try {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
Optional<ServiceMetadata> serviceMetadata = metadataManager.getServiceMetadata(service);
if (!serviceMetadata.isPresent()) {
return;
}
ServiceManager serviceManager = ApplicationUtils.getBean(ServiceManager.class);
com.alibaba.nacos.naming.core.Service serviceV1 = newServiceForV1(serviceManager, serviceMetadata.get());
serviceManager.addOrReplaceService(serviceV1);
} catch (Exception e) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e);
}
ServiceChangeV2Task retryTask = new ServiceChangeV2Task(service);
retryTask.setTaskInterval(3000L);
String taskKey = ServiceChangeV2Task.getKey(service);
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
}
}
private com.alibaba.nacos.naming.core.Service newServiceForV1(ServiceManager serviceManager,
ServiceMetadata serviceMetadata) {
com.alibaba.nacos.naming.core.Service result = serviceManager
.getService(service.getNamespace(), service.getGroupedServiceName());
if (null == result) {
result = new com.alibaba.nacos.naming.core.Service(service.getGroupedServiceName());
result.setGroupName(service.getGroup());
result.setNamespaceId(service.getNamespace());
}
result.setSelector(serviceMetadata.getSelector());
result.setProtectThreshold(serviceMetadata.getProtectThreshold());
result.setMetadata(serviceMetadata.getExtendData());
for (Map.Entry<String, ClusterMetadata> entry : serviceMetadata.getClusters().entrySet()) {
if (!result.getClusterMap().containsKey(entry.getKey())) {
result.addCluster(newClusterV1(entry.getValue()));
} else {
updateCluster(result.getClusterMap().get(entry.getKey()), entry.getValue());
}
}
result.init();
return result;
}
private Cluster newClusterV1(ClusterMetadata metadata) {
Cluster result = new Cluster();
result.setDefCkport(metadata.getHealthyCheckPort());
result.setUseIPPort4Check(metadata.isUseInstancePortForCheck());
result.setHealthChecker(metadata.getHealthChecker());
result.setMetadata(metadata.getExtendData());
return result;
}
private void updateCluster(Cluster cluster, ClusterMetadata metadata) {
cluster.setDefCkport(metadata.getHealthyCheckPort());
cluster.setUseIPPort4Check(metadata.isUseInstancePortForCheck());
cluster.setHealthChecker(metadata.getHealthChecker());
cluster.setMetadata(metadata.getExtendData());
}
}

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.selector.NoneSelector;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
@ -62,6 +63,9 @@ public class DoubleWriteMetadataChangeToV2Task extends AbstractExecuteTask {
}
}
} catch (Exception e) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("Double write task for {} metadata from 2 to 1 failed", service, e);
}
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(service.getNamespace(),
service.getGroupedServiceName(), service.isEphemeral());
retryTask.setTaskInterval(3000L);