From cecf0c08937d360723ebd41a95ad96b6cf7821b1 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Mon, 25 Jan 2021 17:10:12 +0800 Subject: [PATCH] Add double write logic for upgrade --- .../common/task/engine/TaskExecuteWorker.java | 4 +- core/pom.xml | 2 +- .../core/cluster/MemberMetaDataConstants.java | 5 +- .../distro/DistroConsistencyServiceImpl.java | 6 + .../v2/DistroClientComponentRegistry.java | 10 +- .../distro/v2/DistroClientDataProcessor.java | 10 +- .../persistent/ClusterVersionJudgement.java | 8 +- .../naming/controllers/CatalogController.java | 17 +- .../naming/controllers/ClusterController.java | 15 +- .../controllers/InstanceController.java | 42 ++-- .../naming/controllers/ServiceController.java | 19 +- .../naming/core/CatalogServiceV1Impl.java | 2 +- .../core/InstanceOperatorClientImpl.java | 2 +- .../core/InstanceOperatorServiceImpl.java | 2 +- .../alibaba/nacos/naming/core/Service.java | 15 +- .../core/v2/metadata/ClusterMetadata.java | 2 +- .../core/v2/upgrade/UpgradeJudgement.java | 189 +++++++++++++++ .../delay/DoubleWriteDelayTaskEngine.java | 42 ++++ .../delay/ServiceChangeV1Task.java | 148 ++++++++++++ .../DoubleWriteInstanceChangeToV2Task.java | 65 +++++ .../DoubleWriteMetadataChangeToV2Task.java | 83 +++++++ .../healthcheck/ClientBeatCheckTask.java | 5 + .../naming/healthcheck/HealthCheckTask.java | 5 + .../heartbeat/ClientBeatCheckTaskV2.java | 6 +- .../heartbeat/ClientBeatProcessorV2.java | 2 + .../heartbeat/InstanceBeatCheckTask.java | 6 +- .../HealthCheckEnableInterceptor.java | 8 +- .../healthcheck/v2/HealthCheckTaskV2.java | 9 +- .../v2/processor/HealthCheckCommonV2.java | 6 +- .../v2/processor/TcpHealthCheckProcessor.java | 2 +- .../AbstractNamingInterceptorChain.java | 3 +- .../naming/interceptor/Interceptable.java | 5 + .../nacos/naming/push/UdpPushService.java | 6 + .../v2/NamingSubscriberServiceV2Impl.java | 9 +- .../nacos/naming/web/DistroFilter.java | 79 +++--- .../naming/web/DistroTagGeneratorImpl.java | 19 +- .../core/v2/upgrade/UpgradeJudgementTest.java | 228 ++++++++++++++++++ .../v2/NamingSubscriberServiceV2ImplTest.java | 7 +- 38 files changed, 986 insertions(+), 107 deletions(-) create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteDelayTaskEngine.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV1Task.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java create mode 100644 naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index 59fa690f0..63ad61e40 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -116,10 +116,10 @@ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { - log.warn("distro task {} takes {}ms", task, duration); + log.warn("task {} takes {}ms", task, duration); } } catch (Throwable e) { - log.error("[DISTRO-FAILED] " + e.toString(), e); + log.error("[TASK-FAILED] " + e.toString(), e); } } } diff --git a/core/pom.xml b/core/pom.xml index 976d5578f..a7347ffce 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ com.alibaba.nacos nacos-all - 2.0.0-1-SNAPSHOT + ${revision} ../pom.xml diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java index 697efac38..a72bc666f 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java @@ -40,5 +40,8 @@ public class MemberMetaDataConstants { public static final String SUPPORT_REMOTE_C_TYPE = "remoteConnectType"; - public static final String[] BASIC_META_KEYS = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, VERSION}; + public static final String READY_TO_UPGRADE = "readyToUpgrade"; + + public static final String[] BASIC_META_KEYS = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, VERSION, + READY_TO_UPGRADE}; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index ccc2bbe69..259e6e3cf 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -34,11 +34,13 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.GlobalConfig; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.pojo.Record; +import com.alibaba.nacos.sys.utils.ApplicationUtils; import org.apache.commons.lang3.StringUtils; import org.javatuples.Pair; import org.springframework.context.annotation.DependsOn; @@ -105,6 +107,10 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @Override public void put(String key, Record value) throws NacosException { onPut(key, value); + // If upgrade to 2.0.X, do not sync for v1. + if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { + return; + } distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientComponentRegistry.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientComponentRegistry.java index d204927d1..0ff48ae45 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientComponentRegistry.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientComponentRegistry.java @@ -24,6 +24,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager; import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.GlobalConfig; import org.springframework.stereotype.Component; @@ -51,9 +52,12 @@ public class DistroClientComponentRegistry { private final ClusterRpcClientProxy clusterRpcClientProxy; + private final UpgradeJudgement upgradeJudgement; + public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol, DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder, GlobalConfig globalConfig, - ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy) { + ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy, + UpgradeJudgement upgradeJudgement) { this.serverMemberManager = serverMemberManager; this.distroProtocol = distroProtocol; this.componentHolder = componentHolder; @@ -61,6 +65,7 @@ public class DistroClientComponentRegistry { this.globalConfig = globalConfig; this.clientManager = clientManager; this.clusterRpcClientProxy = clusterRpcClientProxy; + this.upgradeJudgement = upgradeJudgement; } /** @@ -69,7 +74,8 @@ public class DistroClientComponentRegistry { */ @PostConstruct public void doRegister() { - DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol); + DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol, + upgradeJudgement); DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy, serverMemberManager); DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(globalConfig, diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java index 554f46fb8..694653115 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientDataProcessor.java @@ -35,6 +35,7 @@ import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent; import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent; import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.utils.ApplicationUtils; @@ -57,9 +58,13 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro private final DistroProtocol distroProtocol; - public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol) { + private final UpgradeJudgement upgradeJudgement; + + public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol, + UpgradeJudgement upgradeJudgement) { this.clientManager = clientManager; this.distroProtocol = distroProtocol; + this.upgradeJudgement = upgradeJudgement; NotifyCenter.registerSubscriber(this); } @@ -76,6 +81,9 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro if (EnvUtil.getStandaloneMode()) { return; } + if (!upgradeJudgement.isUseGrpcFeatures()) { + return; + } ClientEvent clientEvent = (ClientEvent) event; Client client = clientEvent.getClient(); // Only ephemeral data sync by Distro, persist client should sync by raft. diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java index 1ea1c70ce..dbbc2a110 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java @@ -100,13 +100,19 @@ public class ClusterVersionJudgement { for (ConsumerWithPriority consumer : observers) { consumer.consumer.accept(true); } - observers.clear(); } public boolean allMemberIsNewVersion() { return allMemberIsNewVersion; } + /** + * Only used for upgrade to 2.0.0 + */ + public void reset() { + allMemberIsNewVersion = false; + } + private static class ConsumerWithPriority implements Comparable { private final Consumer consumer; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/CatalogController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/CatalogController.java index c9ce30fff..8b15fb41a 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/CatalogController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/CatalogController.java @@ -25,10 +25,12 @@ import com.alibaba.nacos.auth.annotation.Secured; import com.alibaba.nacos.auth.common.ActionTypes; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.utils.WebUtils; +import com.alibaba.nacos.naming.core.CatalogService; import com.alibaba.nacos.naming.core.CatalogServiceV1Impl; import com.alibaba.nacos.naming.core.CatalogServiceV2Impl; import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.healthcheck.HealthCheckTask; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.web.NamingResourceParser; @@ -63,6 +65,9 @@ public class CatalogController { @Autowired private CatalogServiceV2Impl catalogServiceV2; + @Autowired + private UpgradeJudgement upgradeJudgement; + /** * Get service detail. * @@ -77,7 +82,7 @@ public class CatalogController { String serviceName) throws NacosException { String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName); String groupName = NamingUtils.getGroupName(serviceName); - return catalogServiceV2.getServiceDetail(namespaceId, groupName, serviceNameWithoutGroup); + return judgeCatalogService().getServiceDetail(namespaceId, groupName, serviceNameWithoutGroup); } /** @@ -98,7 +103,7 @@ public class CatalogController { @RequestParam int pageSize) throws NacosException { String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName); String groupName = NamingUtils.getGroupName(serviceName); - List instances = catalogServiceV2 + List instances = judgeCatalogService() .listInstances(namespaceId, groupName, serviceNameWithoutGroup, clusterName); int start = (page - 1) * pageSize; int end = page * pageSize; @@ -146,9 +151,9 @@ public class CatalogController { @RequestParam(required = false) boolean hasIpCount) throws NacosException { if (withInstances) { - return catalogServiceV2.pageListServiceDetail(namespaceId, groupName, serviceName, pageNo, pageSize); + return judgeCatalogService().pageListServiceDetail(namespaceId, groupName, serviceName, pageNo, pageSize); } - return catalogServiceV2 + return judgeCatalogService() .pageListService(namespaceId, groupName, serviceName, pageNo, pageSize, containedInstance, hasIpCount); } @@ -187,4 +192,8 @@ public class CatalogController { result.replace("clusters", clusters); return result; } + + private CatalogService judgeCatalogService() { + return upgradeJudgement.isUseGrpcFeatures() ? catalogServiceV2 : catalogServiceV1; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ClusterController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ClusterController.java index 755ec86ba..4e2ab3fac 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ClusterController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ClusterController.java @@ -23,9 +23,11 @@ import com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckerFactory; import com.alibaba.nacos.auth.annotation.Secured; import com.alibaba.nacos.auth.common.ActionTypes; import com.alibaba.nacos.core.utils.WebUtils; +import com.alibaba.nacos.naming.core.ClusterOperator; import com.alibaba.nacos.naming.core.ClusterOperatorV1Impl; import com.alibaba.nacos.naming.core.ClusterOperatorV2Impl; import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.web.NamingResourceParser; import org.apache.commons.lang3.BooleanUtils; @@ -46,11 +48,15 @@ import javax.servlet.http.HttpServletRequest; @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster") public class ClusterController { + private final UpgradeJudgement upgradeJudgement; + private final ClusterOperatorV1Impl clusterOperatorV1; private final ClusterOperatorV2Impl clusterOperatorV2; - public ClusterController(ClusterOperatorV1Impl clusterOperatorV1, ClusterOperatorV2Impl clusterOperatorV2) { + public ClusterController(UpgradeJudgement upgradeJudgement, ClusterOperatorV1Impl clusterOperatorV1, + ClusterOperatorV2Impl clusterOperatorV2) { + this.upgradeJudgement = upgradeJudgement; this.clusterOperatorV1 = clusterOperatorV1; this.clusterOperatorV2 = clusterOperatorV2; } @@ -79,8 +85,11 @@ public class ClusterController { clusterMetadata.setHealthyCheckType(healthChecker.getType()); clusterMetadata.setExtendData( UtilsAndCommons.parseMetadata(WebUtils.optional(request, "metadata", StringUtils.EMPTY))); - clusterOperatorV2.updateClusterMetadata(namespaceId, serviceName, clusterName, clusterMetadata); + judgeClusterOperator().updateClusterMetadata(namespaceId, serviceName, clusterName, clusterMetadata); return "ok"; } - + + private ClusterOperator judgeClusterOperator() { + return upgradeJudgement.isUseGrpcFeatures() ? clusterOperatorV2 : clusterOperatorV1; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java index 1455a9cbc..98f809b19 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java @@ -25,9 +25,12 @@ import com.alibaba.nacos.auth.common.ActionTypes; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.utils.WebUtils; import com.alibaba.nacos.naming.core.Instance; +import com.alibaba.nacos.naming.core.InstanceOperator; import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl; +import com.alibaba.nacos.naming.core.InstanceOperatorServiceImpl; import com.alibaba.nacos.naming.core.InstancePatchObject; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.healthcheck.RsInfo; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.SwitchDomain; @@ -82,7 +85,13 @@ public class InstanceController { private ServiceManager serviceManager; @Autowired - private InstanceOperatorClientImpl instanceService; + private InstanceOperatorClientImpl instanceServiceV2; + + @Autowired + private InstanceOperatorServiceImpl instanceServiceV1; + + @Autowired + private UpgradeJudgement upgradeJudgement; /** * Register new instance. @@ -102,8 +111,8 @@ public class InstanceController { NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); - - instanceService.registerInstance(namespaceId, serviceName, instance); + + getInstanceOperator().registerInstance(namespaceId, serviceName, instance); return "ok"; } @@ -122,8 +131,8 @@ public class InstanceController { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); - - instanceService.removeInstance(namespaceId, serviceName, instance); + + getInstanceOperator().removeInstance(namespaceId, serviceName, instance); return "ok"; } @@ -141,7 +150,7 @@ public class InstanceController { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); - instanceService.updateInstance(namespaceId, serviceName, parseInstance(request)); + getInstanceOperator().updateInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; } @@ -305,7 +314,7 @@ public class InstanceController { patchObject.setEnabled(BooleanUtils.toBoolean(enabledString)); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); - instanceService.patchInstance(namespaceId, serviceName, patchObject); + getInstanceOperator().patchInstance(namespaceId, serviceName, patchObject); return "ok"; } @@ -336,10 +345,9 @@ public class InstanceController { String env = WebUtils.optional(request, "env", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); - Subscriber subscriber = - udpPort > 0 ? new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, - udpPort, clusters) : null; - return instanceService.listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); + Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, + udpPort, clusters); + return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); } /** @@ -360,7 +368,7 @@ public class InstanceController { String ip = WebUtils.required(request, "ip"); int port = Integer.parseInt(WebUtils.required(request, "port")); - com.alibaba.nacos.api.naming.pojo.Instance instance = instanceService + com.alibaba.nacos.api.naming.pojo.Instance instance = getInstanceOperator() .getInstance(namespaceId, serviceName, cluster, ip, port); ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put("service", serviceName); @@ -413,10 +421,10 @@ public class InstanceController { NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); - int resultCode = instanceService.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat); + int resultCode = getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat); result.put(CommonParams.CODE, resultCode); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, - instanceService.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); + getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } @@ -443,7 +451,7 @@ public class InstanceController { } NamingUtils.checkServiceNameFormat(serviceName); - List ips = instanceService + List ips = getInstanceOperator() .listAllInstances(namespaceId, serviceName); ObjectNode result = JacksonUtils.createEmptyJsonNode(); @@ -516,4 +524,8 @@ public class InstanceController { return instance; } + + private InstanceOperator getInstanceOperator() { + return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java index aa41a1f1d..477a9c706 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java @@ -32,10 +32,12 @@ import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.ServiceOperator; import com.alibaba.nacos.naming.core.ServiceOperatorV1Impl; import com.alibaba.nacos.naming.core.ServiceOperatorV2Impl; import com.alibaba.nacos.naming.core.SubscribeManager; import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.pojo.Subscriber; @@ -93,6 +95,9 @@ public class ServiceController { @Autowired private ServiceOperatorV2Impl serviceOperatorV2; + @Autowired + private UpgradeJudgement upgradeJudgement; + /** * Create a new service. This API will create a persistence service. * @@ -115,7 +120,7 @@ public class ServiceController { serviceMetadata.setSelector(parseSelector(selector)); serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata)); serviceMetadata.setEphemeral(false); - serviceOperatorV2.create(namespaceId, serviceName, serviceMetadata); + getServiceOperator().create(namespaceId, serviceName, serviceMetadata); return "ok"; } @@ -131,8 +136,8 @@ public class ServiceController { @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String remove(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId, @RequestParam String serviceName) throws Exception { - - serviceOperatorV2.delete(namespaceId, serviceName); + + getServiceOperator().delete(namespaceId, serviceName); return "ok"; } @@ -193,7 +198,7 @@ public class ServiceController { String groupName = WebUtils.optional(request, CommonParams.GROUP_NAME, Constants.DEFAULT_GROUP); String selectorString = WebUtils.optional(request, "selector", StringUtils.EMPTY); ObjectNode result = JacksonUtils.createEmptyJsonNode(); - List serviceNameList = serviceOperatorV2 + List serviceNameList = getServiceOperator() .listService(namespaceId, groupName, selectorString, pageSize, pageNo); result.replace("doms", JacksonUtils.transferToJsonNode(serviceNameList)); result.put("count", serviceNameList.size()); @@ -221,7 +226,7 @@ public class ServiceController { com.alibaba.nacos.naming.core.v2.pojo.Service service = com.alibaba.nacos.naming.core.v2.pojo.Service .newService(namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName)); - serviceOperatorV2.update(service, serviceMetadata); + getServiceOperator().update(service, serviceMetadata); return "ok"; } @@ -457,4 +462,8 @@ public class ServiceController { throw new NacosException(NacosException.INVALID_PARAM, "not match any type of selector!"); } } + + private ServiceOperator getServiceOperator() { + return upgradeJudgement.isUseGrpcFeatures() ? serviceOperatorV2 : serviceOperatorV1; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/CatalogServiceV1Impl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/CatalogServiceV1Impl.java index 66672b194..513a6bbcf 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/CatalogServiceV1Impl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/CatalogServiceV1Impl.java @@ -74,7 +74,7 @@ public class CatalogServiceV1Impl implements CatalogService { @Override public List listInstances(String namespaceId, String groupName, String serviceName, String clusterName) throws NacosException { - Service service = serviceManager.getService(namespaceId, serviceName); + Service service = serviceManager.getService(namespaceId, NamingUtils.getGroupedName(serviceName, groupName)); if (service == null) { throw new NacosException(NacosException.NOT_FOUND, String.format("service %s@@%s is not found!", groupName, serviceName)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorClientImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorClientImpl.java index e95c0bc96..44c93f4eb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorClientImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorClientImpl.java @@ -160,7 +160,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator { public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster, boolean healthOnly) { Service service = getService(namespaceId, serviceName, true); - if (null != subscriber) { + if (subscriber.getPort() > 0) { String clientId = IpPortBasedClient.getClientId(subscriber.getAddrStr(), true); createIpPortClientIfAbsent(clientId, true); clientOperationService.subscribeService(service, subscriber, clientId); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java index 1f197f386..68bf39573 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java @@ -311,7 +311,7 @@ public class InstanceOperatorServiceImpl implements InstanceOperator { public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) { com.alibaba.nacos.naming.core.Instance instance = serviceManager .getInstance(namespaceId, serviceName, cluster, ip, port); - if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { + if (null != instance && instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { return instance.getInstanceHeartBeatInterval(); } return switchDomain.getClientBeatInterval(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java index fc7c8c334..7c14d5b23 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java @@ -16,12 +16,13 @@ package com.alibaba.nacos.naming.core; -import com.alibaba.nacos.common.utils.JacksonUtils; -import com.alibaba.nacos.sys.utils.ApplicationUtils; -import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.RecordListener; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task; import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask; import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor; import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor; @@ -32,10 +33,10 @@ import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.push.UdpPushService; import com.alibaba.nacos.naming.selector.NoneSelector; import com.alibaba.nacos.naming.selector.Selector; +import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.ListUtils; import org.apache.commons.lang3.StringUtils; @@ -52,8 +53,7 @@ import java.util.Map; * Service of Nacos server side * *

We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which - * contain - * a list of instances. + * contain a list of instances. * *

his class inherits from Service in API module and stores some fields that do not have to expose to client. * @@ -277,6 +277,9 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); + ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class) + .addTask(ServiceChangeV1Task.getKey(namespaceId, getName(), ephemeral), + new ServiceChangeV1Task(namespaceId, getName(), ephemeral)); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ClusterMetadata.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ClusterMetadata.java index f2ecda26c..14f0c441e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ClusterMetadata.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ClusterMetadata.java @@ -41,7 +41,7 @@ public class ClusterMetadata implements Serializable { /** * Whether or not use instance port to do health check. */ - private boolean useInstancePortForCheck = false; + private boolean useInstancePortForCheck = true; private Map extendData = new ConcurrentHashMap<>(1); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java new file mode 100644 index 000000000..4becc264d --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java @@ -0,0 +1,189 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.common.JustForTest; +import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.common.executor.NameThreadFactory; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; +import com.alibaba.nacos.core.cluster.ServerMemberManager; +import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; +import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore; +import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet; +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.monitor.MetricsMonitor; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.util.VersionUtil; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Ability judgement during upgrading. + * + * @author xiweng.yy + */ +@Component +public class UpgradeJudgement extends Subscriber { + + /** + * Only when all cluster upgrade upper 2.0.0, this features is true. + */ + private final AtomicBoolean useGrpcFeatures = new AtomicBoolean(false); + + /** + * Only when all cluster upgrade upper 1.4.0, this features is true. + */ + private final AtomicBoolean useJraftFeatures = new AtomicBoolean(false); + + private final RaftPeerSet raftPeerSet; + + private final RaftCore raftCore; + + private final ClusterVersionJudgement versionJudgement; + + private final ServerMemberManager memberManager; + + private final ServiceManager serviceManager; + + private final DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine; + + private final ScheduledExecutorService upgradeChecker; + + public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersionJudgement versionJudgement, + ServerMemberManager memberManager, ServiceManager serviceManager, + DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine) { + this.raftPeerSet = raftPeerSet; + this.raftCore = raftCore; + this.versionJudgement = versionJudgement; + this.memberManager = memberManager; + this.serviceManager = serviceManager; + this.doubleWriteDelayTaskEngine = doubleWriteDelayTaskEngine; + upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker")); + upgradeChecker.scheduleAtFixedRate(() -> { + if (isUseGrpcFeatures()) { + return; + } + boolean canUpgrade = checkForUpgrade(); + Loggers.SRV_LOG.info("upgrade check result {}", canUpgrade); + if (canUpgrade) { + doUpgrade(); + } + }, 100L, 5000L, TimeUnit.MILLISECONDS); + NotifyCenter.registerSubscriber(this); + } + + @JustForTest + void setUseGrpcFeatures(boolean value) { + useGrpcFeatures.set(value); + } + + @JustForTest + void setUseJraftFeatures(boolean value) { + useJraftFeatures.set(value); + } + + public boolean isUseGrpcFeatures() { + return useGrpcFeatures.get(); + } + + public boolean isUseJraftFeatures() { + return useJraftFeatures.get(); + } + + @Override + public void onEvent(MembersChangeEvent event) { + Loggers.SRV_LOG.info("member change, new members {}", event.getMembers()); + for (Member each : event.getMembers()) { + Object versionStr = each.getExtendVal(MemberMetaDataConstants.VERSION); + // come from below 1.3.0 + if (null == versionStr) { + checkAndDowngrade(false); + return; + } + Version version = VersionUtil.parseVersion(versionStr.toString()); + if (version.getMajorVersion() < 2) { + checkAndDowngrade(version.getMinorVersion() >= 4); + return; + } + } + } + + private void checkAndDowngrade(boolean jraftFeature) { + boolean isDowngradeGrpc = useGrpcFeatures.compareAndSet(true, false); + boolean isDowngradeJraft = useJraftFeatures.getAndSet(jraftFeature); + if (isDowngradeGrpc && isDowngradeJraft && !jraftFeature) { + Loggers.SRV_LOG.info("Downgrade to 1.X"); + try { + raftPeerSet.init(); + raftCore.init(); + versionJudgement.reset(); + } catch (Exception e) { + Loggers.SRV_LOG.error("Downgrade rafe failed ", e); + } + } + } + + private boolean checkForUpgrade() { + if (!useGrpcFeatures.get()) { + boolean selfCheckResult = checkServiceAndInstanceNumber() && checkDoubleWriteStatus(); + Member self = memberManager.getSelf(); + self.setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, selfCheckResult); + memberManager.updateMember(self); + } + boolean result = true; + for (Member each : memberManager.allMembers()) { + Object isReadyToUpgrade = each.getExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE); + result &= null != isReadyToUpgrade && (boolean) isReadyToUpgrade; + } + return result; + } + + private boolean checkServiceAndInstanceNumber() { + boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get(); + result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get(); + return result; + } + + private boolean checkDoubleWriteStatus() { + return doubleWriteDelayTaskEngine.isEmpty(); + } + + private void doUpgrade() { + Loggers.SRV_LOG.info("Upgrade to 2.0.X"); + useGrpcFeatures.compareAndSet(false, true); + useJraftFeatures.set(true); + } + + @Override + public Class subscribeType() { + return MembersChangeEvent.class; + } + + public void shutdown() { + upgradeChecker.shutdownNow(); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteDelayTaskEngine.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteDelayTaskEngine.java new file mode 100644 index 000000000..dc6d91d36 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/DoubleWriteDelayTaskEngine.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay; + +import com.alibaba.nacos.common.task.NacosTaskProcessor; +import com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine; +import com.alibaba.nacos.naming.misc.Loggers; +import org.springframework.stereotype.Component; + +/** + * Double Write task engine. + * + * @author xiweng.yy + */ +@Component +public class DoubleWriteDelayTaskEngine extends NacosDelayTaskExecuteEngine { + + public DoubleWriteDelayTaskEngine() { + super(DoubleWriteDelayTaskEngine.class.getSimpleName(), Loggers.SRV_LOG); + addProcessor("v1", new ServiceChangeV1Task.ServiceChangeV1TaskProcessor()); + } + + @Override + public NacosTaskProcessor getProcessor(Object key) { + String actualKey = key.toString().split(":")[0]; + return super.getProcessor(actualKey); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV1Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV1Task.java new file mode 100644 index 000000000..ec86856be --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/delay/ServiceChangeV1Task.java @@ -0,0 +1,148 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay; + +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; +import com.alibaba.nacos.naming.core.Cluster; +import com.alibaba.nacos.naming.core.Instance; +import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; +import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; +import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteInstanceChangeToV2Task; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteMetadataChangeToV2Task; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher; +import com.alibaba.nacos.sys.utils.ApplicationUtils; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Double write delay task during upgrading. + * + * @author xiweng.yy + */ +public class ServiceChangeV1Task extends AbstractDelayTask { + + private final String namespace; + + private final String serviceName; + + private final boolean ephemeral; + + public ServiceChangeV1Task(String namespace, String serviceName, boolean ephemeral) { + this.namespace = namespace; + this.serviceName = serviceName; + this.ephemeral = ephemeral; + setLastProcessTime(System.currentTimeMillis()); + setTaskInterval(1000L); + } + + @Override + public void merge(AbstractDelayTask task) { + } + + public String getNamespace() { + return namespace; + } + + public String getServiceName() { + return serviceName; + } + + public boolean isEphemeral() { + return ephemeral; + } + + public static String getKey(String namespace, String serviceName, boolean ephemeral) { + return "v1:" + namespace + "_" + serviceName + "_" + ephemeral; + } + + public static class ServiceChangeV1TaskProcessor implements NacosTaskProcessor { + + @Override + public boolean process(NacosTask task) { + ServiceChangeV1Task serviceTask = (ServiceChangeV1Task) task; + Loggers.SRV_LOG.info("double write for service {}", serviceTask.getServiceName()); + ServiceManager serviceManager = ApplicationUtils.getBean(ServiceManager.class); + Service service = serviceManager.getService(serviceTask.getNamespace(), serviceTask.getServiceName()); + ServiceMetadata serviceMetadata = parseServiceMetadata(service, serviceTask.isEphemeral()); + DoubleWriteMetadataChangeToV2Task metadataTask = new DoubleWriteMetadataChangeToV2Task( + service.getNamespaceId(), service.getName(), serviceTask.isEphemeral(), serviceMetadata); + NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service.getName(), metadataTask); + ServiceStorage serviceStorage = ApplicationUtils.getBean(ServiceStorage.class); + ServiceInfo serviceInfo = serviceStorage.getPushData(transfer(service, serviceTask.isEphemeral())); + List newInstance = service.allIPs(serviceTask.isEphemeral()); + Set instances = new HashSet<>(); + for (Instance each : newInstance) { + instances.add(each.toIpAddr()); + DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task( + service.getNamespaceId(), service.getName(), each, true); + NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask( + IpPortBasedClient.getClientId(each.toIpAddr(), serviceTask.isEphemeral()), instanceTask); + } + List oldInstance = serviceInfo.getHosts(); + Loggers.SRV_LOG.info("[TMP_DEBUG] oldInstances {} \n new Instances {}", oldInstance, newInstance); + for (com.alibaba.nacos.api.naming.pojo.Instance each : oldInstance) { + if (!instances.contains(each.toInetAddr())) { + DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task( + service.getNamespaceId(), service.getName(), each, false); + NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask( + IpPortBasedClient.getClientId(each.toInetAddr(), serviceTask.isEphemeral()), instanceTask); + } + } + return true; + } + + private com.alibaba.nacos.naming.core.v2.pojo.Service transfer(Service service, boolean ephemeral) { + return com.alibaba.nacos.naming.core.v2.pojo.Service + .newService(service.getNamespaceId(), service.getGroupName(), + NamingUtils.getServiceName(service.getName()), ephemeral); + } + + private ServiceMetadata parseServiceMetadata(Service service, boolean ephemeral) { + ServiceMetadata result = new ServiceMetadata(); + result.setEphemeral(ephemeral); + result.setProtectThreshold(service.getProtectThreshold()); + result.setSelector(service.getSelector()); + result.setExtendData(service.getMetadata()); + for (Map.Entry entry : service.getClusterMap().entrySet()) { + result.getClusters().put(entry.getKey(), parseClusterMetadata(entry.getValue())); + } + return result; + } + + private ClusterMetadata parseClusterMetadata(Cluster cluster) { + ClusterMetadata result = new ClusterMetadata(); + result.setHealthyCheckPort(cluster.getDefCkport()); + result.setUseInstancePortForCheck(cluster.isUseIPPort4Check()); + result.setExtendData(cluster.getMetadata()); + result.setHealthChecker(cluster.getHealthChecker()); + result.setHealthyCheckType(cluster.getHealthChecker().getType()); + return result; + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java new file mode 100644 index 000000000..37807ad0e --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteInstanceChangeToV2Task.java @@ -0,0 +1,65 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute; + +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task; +import com.alibaba.nacos.sys.utils.ApplicationUtils; + +/** + * Double write from 1.x to 2 task. + * + * @author xiweng.yy + */ +public class DoubleWriteInstanceChangeToV2Task extends AbstractExecuteTask { + + private final String namespace; + + private final String serviceName; + + private final Instance instance; + + private final boolean register; + + public DoubleWriteInstanceChangeToV2Task(String namespace, String serviceName, Instance instance, + boolean register) { + this.register = register; + this.namespace = namespace; + this.serviceName = serviceName; + this.instance = instance; + } + + @Override + public void run() { + try { + InstanceOperatorClientImpl instanceOperator = ApplicationUtils.getBean(InstanceOperatorClientImpl.class); + if (register) { + instanceOperator.registerInstance(namespace, serviceName, instance); + } else { + instanceOperator.removeInstance(namespace, serviceName, instance); + } + } catch (Exception e) { + ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral()); + retryTask.setTaskInterval(3000L); + String taskKey = ServiceChangeV1Task.getKey(namespace, serviceName, instance.isEphemeral()); + ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask); + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java new file mode 100644 index 000000000..dcd56c30c --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/doublewrite/execute/DoubleWriteMetadataChangeToV2Task.java @@ -0,0 +1,83 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute; + +import com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckType; +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata; +import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService; +import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task; +import com.alibaba.nacos.naming.selector.NoneSelector; +import com.alibaba.nacos.sys.utils.ApplicationUtils; + +import java.util.Map; + +/** + * Double write from 1.x to 2 task. + * + * @author xiweng.yy + */ +public class DoubleWriteMetadataChangeToV2Task extends AbstractExecuteTask { + + private final Service service; + + private final ServiceMetadata serviceMetadata; + + public DoubleWriteMetadataChangeToV2Task(String namespace, String serviceName, boolean ephemeral, + ServiceMetadata serviceMetadata) { + this.serviceMetadata = serviceMetadata; + String groupName = NamingUtils.getGroupName(serviceName); + String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName); + this.service = Service.newService(namespace, groupName, serviceNameWithoutGroup, ephemeral); + } + + @Override + public void run() { + try { + NamingMetadataOperateService metadataOperate = ApplicationUtils.getBean(NamingMetadataOperateService.class); + if (!isDefaultServiceMetadata()) { + metadataOperate.updateServiceMetadata(service, serviceMetadata); + } + for (Map.Entry entry : serviceMetadata.getClusters().entrySet()) { + if (!isDefaultClusterMetadata(entry.getValue())) { + metadataOperate.addClusterMetadata(service, entry.getKey(), entry.getValue()); + } + } + } catch (Exception e) { + ServiceChangeV1Task retryTask = new ServiceChangeV1Task(service.getNamespace(), + service.getGroupedServiceName(), service.isEphemeral()); + retryTask.setTaskInterval(3000L); + String taskKey = ServiceChangeV1Task + .getKey(service.getNamespace(), service.getGroupedServiceName(), service.isEphemeral()); + ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask); + } + } + + private boolean isDefaultServiceMetadata() { + return serviceMetadata.getExtendData().isEmpty() && serviceMetadata.getProtectThreshold() == 0.0F + && serviceMetadata.getSelector() instanceof NoneSelector && serviceMetadata.isEphemeral(); + } + + private boolean isDefaultClusterMetadata(ClusterMetadata metadata) { + return HealthCheckType.TCP.name().equals(metadata.getHealthyCheckType()) && metadata.getExtendData().isEmpty() + && metadata.getHealthyCheckPort() == 80 && metadata.isUseInstancePortForCheck(); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index 21543d0c1..4811f08f1 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.healthcheck.heartbeat.BeatCheckTask; import com.alibaba.nacos.naming.misc.GlobalConfig; import com.alibaba.nacos.naming.misc.HttpClient; @@ -77,6 +78,10 @@ public class ClientBeatCheckTask implements BeatCheckTask { @Override public void run() { try { + // If upgrade to 2.0.X stop health check with v1 + if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { + return; + } if (!getDistroMapper().responsible(service.getName())) { return; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckTask.java index 8599fb8ce..392736374 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckTask.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.healthcheck; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.naming.core.Cluster; import com.alibaba.nacos.naming.core.DistroMapper; @@ -77,6 +78,10 @@ public class HealthCheckTask implements Runnable { public void run() { try { + // If upgrade to 2.0.X stop health check with v1 + if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { + return; + } if (distroMapper.responsible(cluster.getService().getName()) && switchDomain .isHealthCheckEnabled(cluster.getService().getName())) { healthCheckProcessor.process(this); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatCheckTaskV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatCheckTaskV2.java index 21aadcf37..5f9217480 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatCheckTaskV2.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatCheckTaskV2.java @@ -81,7 +81,11 @@ public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCh } @Override - public void afterIntercept() { + public void passIntercept() { doHealthCheck(); } + + @Override + public void afterIntercept() { + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatProcessorV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatProcessorV2.java index 448dc98f9..632965bd0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatProcessorV2.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ClientBeatProcessorV2.java @@ -19,6 +19,7 @@ package com.alibaba.nacos.naming.healthcheck.heartbeat; import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent; import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent; import com.alibaba.nacos.naming.core.v2.pojo.HealthCheckInstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.Service; @@ -66,6 +67,7 @@ public class ClientBeatProcessorV2 implements BeatProcessor { Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service)); + NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client)); } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceBeatCheckTask.java index 592886207..d8f9dd158 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceBeatCheckTask.java @@ -53,12 +53,16 @@ public class InstanceBeatCheckTask implements Interceptable { } @Override - public void afterIntercept() { + public void passIntercept() { for (InstanceBeatChecker each : CHECKERS) { each.doCheck(client, service, instancePublishInfo); } } + @Override + public void afterIntercept() { + } + public Client getClient() { return client; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/interceptor/HealthCheckEnableInterceptor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/interceptor/HealthCheckEnableInterceptor.java index f6337763c..8e43dd2d4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/interceptor/HealthCheckEnableInterceptor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/interceptor/HealthCheckEnableInterceptor.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.healthcheck.interceptor; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.healthcheck.NacosHealthCheckTask; import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.sys.utils.ApplicationUtils; @@ -29,7 +30,12 @@ public class HealthCheckEnableInterceptor extends AbstractHealthCheckInterceptor @Override public boolean intercept(NacosHealthCheckTask object) { - return !ApplicationUtils.getBean(SwitchDomain.class).isHealthCheckEnabled(); + try { + return !ApplicationUtils.getBean(SwitchDomain.class).isHealthCheckEnabled() || !ApplicationUtils + .getBean(UpgradeJudgement.class).isUseGrpcFeatures(); + } catch (Exception e) { + return true; + } } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/HealthCheckTaskV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/HealthCheckTaskV2.java index 99b90598b..fd408818c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/HealthCheckTaskV2.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/HealthCheckTaskV2.java @@ -127,10 +127,17 @@ public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealt } @Override - public void afterIntercept() { + public void passIntercept() { doHealthCheck(); } + @Override + public void afterIntercept() { + if (!cancelled) { + HealthCheckReactor.scheduleCheck(this); + } + } + @Override public void run() { doHealthCheck(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/HealthCheckCommonV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/HealthCheckCommonV2.java index e7466edf4..93f438a87 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/HealthCheckCommonV2.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/HealthCheckCommonV2.java @@ -96,7 +96,7 @@ public class HealthCheckCommonV2 { String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString(); if (instance.getOkCount().incrementAndGet() >= switchDomain.getCheckTimes()) { if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper - .responsible(serviceName)) { + .responsible(task.getClient().getResponsibleId())) { healthStatusSynchronizer.instanceHealthStatusChange(true, task.getClient(), service, instance); Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}", serviceName, instance.getIp(), instance.getPort(), clusterName, @@ -130,7 +130,7 @@ public class HealthCheckCommonV2 { String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString(); if (instance.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) { if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper - .responsible(serviceName)) { + .responsible(task.getClient().getResponsibleId())) { healthStatusSynchronizer.instanceHealthStatusChange(false, task.getClient(), service, instance); Loggers.EVT_LOG .info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}", @@ -165,7 +165,7 @@ public class HealthCheckCommonV2 { String serviceName = service.getGroupedServiceName(); String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString(); if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper - .responsible(serviceName)) { + .responsible(task.getClient().getResponsibleId())) { healthStatusSynchronizer.instanceHealthStatusChange(false, task.getClient(), service, instance); Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}", serviceName, instance.getIp(), instance.getPort(), clusterName, diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/TcpHealthCheckProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/TcpHealthCheckProcessor.java index c4ee8de52..af85acd77 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/TcpHealthCheckProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/v2/processor/TcpHealthCheckProcessor.java @@ -279,7 +279,7 @@ public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable healthCheckCommon.checkFail(task, service, msg); } - keyMap.remove(task.toString()); + keyMap.remove(toString()); } healthCheckCommon.reEvaluateCheckRT(rt, task, switchDomain.getTcpHealthParams()); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/interceptor/AbstractNamingInterceptorChain.java b/naming/src/main/java/com/alibaba/nacos/naming/interceptor/AbstractNamingInterceptorChain.java index 1302c22f2..0986009c0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/interceptor/AbstractNamingInterceptorChain.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/interceptor/AbstractNamingInterceptorChain.java @@ -60,9 +60,10 @@ public abstract class AbstractNamingInterceptorChain continue; } if (each.intercept(object)) { + object.afterIntercept(); return; } } - object.afterIntercept(); + object.passIntercept(); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/interceptor/Interceptable.java b/naming/src/main/java/com/alibaba/nacos/naming/interceptor/Interceptable.java index d0b3da111..ba893f4af 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/interceptor/Interceptable.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/interceptor/Interceptable.java @@ -26,5 +26,10 @@ public interface Interceptable { /** * If no {@link NacosNamingInterceptor} intercept this object, this method will be called to execute. */ + void passIntercept(); + + /** + * If one {@link NacosNamingInterceptor} intercept this object, this method will be called. + */ void afterIntercept(); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/UdpPushService.java b/naming/src/main/java/com/alibaba/nacos/naming/push/UdpPushService.java index a40a78773..f8e1cacf7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/UdpPushService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/UdpPushService.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.remote.PushCallBack; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.SwitchDomain; @@ -34,6 +35,7 @@ import com.alibaba.nacos.naming.remote.udp.AckEntry; import com.alibaba.nacos.naming.remote.udp.AckPacket; import com.alibaba.nacos.naming.remote.udp.UdpConnector; import com.alibaba.nacos.naming.utils.Constants; +import com.alibaba.nacos.sys.utils.ApplicationUtils; import org.apache.commons.collections.MapUtils; import org.codehaus.jackson.util.VersionUtil; import org.springframework.beans.BeansException; @@ -114,6 +116,10 @@ public class UdpPushService implements ApplicationContextAware, ApplicationListe @Override public void onApplicationEvent(ServiceChangeEvent event) { + // If upgrade to 2.0.X, do not push for v1. + if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { + return; + } Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java index 3178b1d4e..9ba7ed06c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java @@ -26,6 +26,7 @@ import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent; import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.push.NamingSubscriberService; import com.alibaba.nacos.naming.push.v2.executor.PushExecutorDelegate; @@ -54,11 +55,14 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na private final PushDelayTaskExecuteEngine delayTaskEngine; + private final UpgradeJudgement upgradeJudgement; + public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager, ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage, - PushExecutorDelegate pushExecutor) { + PushExecutorDelegate pushExecutor, UpgradeJudgement upgradeJudgement) { this.clientManager = clientManager; this.indexesManager = indexesManager; + this.upgradeJudgement = upgradeJudgement; this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage, pushExecutor); NotifyCenter.registerSubscriber(this); @@ -106,6 +110,9 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na @Override public void onEvent(Event event) { + if (!upgradeJudgement.isUseGrpcFeatures()) { + return; + } ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, 500L)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java index 42ac029c3..57d3cdba8 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java @@ -92,46 +92,49 @@ public class DistroFilter implements Filter { throw new NoSuchMethodException(req.getMethod() + " " + path); } + if (!method.isAnnotationPresent(CanDistro.class)) { + filterChain.doFilter(req, resp); + return; + } String distroTag = distroTagGenerator.getResponsibleTag(req); - // proxy request to other server if necessary: - if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(distroTag)) { - - String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER); - - if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) { - // This request is sent from peer server, should not be redirected again: - Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr()); - resp.sendError(HttpServletResponse.SC_BAD_REQUEST, - "receive invalid redirect request from peer " + req.getRemoteAddr()); - return; - } - - final String targetServer = distroMapper.mapSrv(distroTag); - - List headerList = new ArrayList<>(16); - Enumeration headers = req.getHeaderNames(); - while (headers.hasMoreElements()) { - String headerName = headers.nextElement(); - headerList.add(headerName); - headerList.add(req.getHeader(headerName)); - } - - final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name()); - final Map paramsValue = HttpClient.translateParameterMap(req.getParameterMap()); - - RestResult result = HttpClient - .request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body, - PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod()); - String data = result.ok() ? result.getData() : result.getMessage(); - try { - WebUtils.response(resp, data, result.getCode()); - } catch (Exception ignore) { - Loggers.SRV_LOG - .warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString); - } - } else { + if (distroMapper.responsible(distroTag)) { filterChain.doFilter(req, resp); + return; + } + + // proxy request to other server if necessary: + String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER); + + if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) { + // This request is sent from peer server, should not be redirected again: + Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, + "receive invalid redirect request from peer " + req.getRemoteAddr()); + return; + } + + final String targetServer = distroMapper.mapSrv(distroTag); + + List headerList = new ArrayList<>(16); + Enumeration headers = req.getHeaderNames(); + while (headers.hasMoreElements()) { + String headerName = headers.nextElement(); + headerList.add(headerName); + headerList.add(req.getHeader(headerName)); + } + + final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name()); + final Map paramsValue = HttpClient.translateParameterMap(req.getParameterMap()); + + RestResult result = HttpClient + .request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body, + PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod()); + String data = result.ok() ? result.getData() : result.getMessage(); + try { + WebUtils.response(resp, data, result.getCode()); + } catch (Exception ignore) { + Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString); } } catch (AccessControlException e) { resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e)); @@ -147,6 +150,6 @@ public class DistroFilter implements Filter { @Override public void destroy() { - + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroTagGeneratorImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroTagGeneratorImpl.java index 4794ffe54..cd2ba4339 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/web/DistroTagGeneratorImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/web/DistroTagGeneratorImpl.java @@ -16,10 +16,8 @@ package com.alibaba.nacos.naming.web; -import com.alibaba.nacos.core.cluster.Member; -import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; -import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.utils.ReuseHttpServletRequest; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import org.springframework.stereotype.Component; /** @@ -30,14 +28,14 @@ import org.springframework.stereotype.Component; @Component public class DistroTagGeneratorImpl implements DistroTagGenerator { - private final ServerMemberManager serverMemberManager; - private final DistroTagGenerator serviceNameTag = new DistroServiceNameTagGenerator(); private final DistroTagGenerator ipPortTag = new DistroIpPortTagGenerator(); - public DistroTagGeneratorImpl(ServerMemberManager serverMemberManager) { - this.serverMemberManager = serverMemberManager; + private final UpgradeJudgement upgradeJudgement; + + public DistroTagGeneratorImpl(UpgradeJudgement upgradeJudgement) { + this.upgradeJudgement = upgradeJudgement; } @Override @@ -56,11 +54,6 @@ public class DistroTagGeneratorImpl implements DistroTagGenerator { * @return actual tag generator */ private DistroTagGenerator getTagGenerator() { - for (Member each : serverMemberManager.allMembers()) { - if (null == each.getExtendVal(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE)) { - return serviceNameTag; - } - } - return ipPortTag; + return upgradeJudgement.isUseGrpcFeatures() ? ipPortTag : serviceNameTag; } } diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java new file mode 100644 index 000000000..244e25496 --- /dev/null +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java @@ -0,0 +1,228 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; +import com.alibaba.nacos.core.cluster.ServerMemberManager; +import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; +import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore; +import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet; +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.sys.env.EnvUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.core.env.ConfigurableEnvironment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class UpgradeJudgementTest { + + @Mock + private ConfigurableEnvironment environment; + + @Mock + private RaftPeerSet raftPeerSet; + + @Mock + private RaftCore raftCore; + + @Mock + private ClusterVersionJudgement versionJudgement; + + @Mock + private ServerMemberManager memberManager; + + @Mock + private ServiceManager serviceManager; + + @Mock + private DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine; + + private UpgradeJudgement upgradeJudgement; + + @Before + public void setUp() throws Exception { + EnvUtil.setEnvironment(environment); + upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, + doubleWriteDelayTaskEngine); + } + + @After + public void tearDown() { + upgradeJudgement.shutdown(); + } + + @Test + public void testUpgradeOneNode() throws Exception { + Collection members = mockMember("1.3.2", "1.3.2", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testUpgradeOneFor14XNode() throws Exception { + Collection members = mockMember("1.4.0", "2.0.0", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertTrue(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testUpgradeTwoNode() throws Exception { + Collection members = mockMember("", "2.0.0", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testUpgradeCheckSucc() throws Exception { + Collection members = mockMember("2.0.0-snapshot", "2.0.0", "2.0.0"); + Iterator iterator = members.iterator(); + when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true); + iterator.next(); + while (iterator.hasNext()) { + iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true); + } + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertTrue(upgradeJudgement.isUseGrpcFeatures()); + assertTrue(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testUpgradeCheckSelfFail() throws Exception { + Collection members = mockMember("2.0.0", "2.0.0", "2.0.0"); + Iterator iterator = members.iterator(); + when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(false); + iterator.next(); + while (iterator.hasNext()) { + iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true); + } + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testUpgradeCheckOthersFail() throws Exception { + Collection members = mockMember("2.0.0", "2.0.0", "2.0.0"); + when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testDowngradeOneFor14XNode() throws Exception { + upgradeJudgement.setUseGrpcFeatures(true); + upgradeJudgement.setUseJraftFeatures(true); + Collection members = mockMember("1.4.0", "2.0.0", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet, never()).init(); + verify(raftCore, never()).init(); + verify(versionJudgement, never()).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertTrue(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testDowngradeTwoNode() throws Exception { + upgradeJudgement.setUseGrpcFeatures(true); + upgradeJudgement.setUseJraftFeatures(true); + Collection members = mockMember("", "", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet).init(); + verify(raftCore).init(); + verify(versionJudgement).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + @Test + public void testDowngradeOneNode() throws Exception { + upgradeJudgement.setUseGrpcFeatures(true); + upgradeJudgement.setUseJraftFeatures(true); + Collection members = mockMember("1.3.2", "2.0.0", "2.0.0"); + upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build()); + verify(raftPeerSet).init(); + verify(raftCore).init(); + verify(versionJudgement).reset(); + TimeUnit.MILLISECONDS.sleep(500); + assertFalse(upgradeJudgement.isUseGrpcFeatures()); + assertFalse(upgradeJudgement.isUseJraftFeatures()); + } + + private Collection mockMember(String... versions) { + Collection result = new HashSet<>(); + for (int i = 0; i < versions.length; i++) { + Member member = new Member(); + member.setPort(i); + if (StringUtils.isNotBlank(versions[i])) { + member.setExtendVal(MemberMetaDataConstants.VERSION, versions[i]); + } + result.add(member); + } + when(memberManager.getSelf()).thenReturn(result.iterator().next()); + when(memberManager.allMembers()).thenReturn(result); + return result; + } +} diff --git a/naming/src/test/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2ImplTest.java b/naming/src/test/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2ImplTest.java index 3b00bfc49..86218a4ef 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2ImplTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2ImplTest.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate; import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent; import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager; import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement; import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.push.v2.task.PushDelayTask; import com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine; @@ -62,11 +63,15 @@ public class NamingSubscriberServiceV2ImplTest { @Mock private Client client; + @Mock + private UpgradeJudgement upgradeJudgement; + private NamingSubscriberServiceV2Impl subscriberService; @Before public void setUp() throws Exception { - subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null); + subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null, + upgradeJudgement); ReflectionTestUtils.setField(subscriberService, "delayTaskEngine", delayTaskEngine); when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(testClientId)); when(indexesManager.getAllClientsSubscribeService(service1))