Add double write logic for upgrade

This commit is contained in:
KomachiSion 2021-01-25 17:10:12 +08:00
parent 4096026048
commit cecf0c0893
38 changed files with 986 additions and 107 deletions

View File

@ -116,10 +116,10 @@ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("distro task {} takes {}ms", task, duration);
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[DISTRO-FAILED] " + e.toString(), e);
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}

View File

@ -21,7 +21,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>2.0.0-1-SNAPSHOT</version>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -40,5 +40,8 @@ public class MemberMetaDataConstants {
public static final String SUPPORT_REMOTE_C_TYPE = "remoteConnectType";
public static final String[] BASIC_META_KEYS = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, VERSION};
public static final String READY_TO_UPGRADE = "readyToUpgrade";
public static final String[] BASIC_META_KEYS = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, VERSION,
READY_TO_UPGRADE};
}

View File

@ -34,11 +34,13 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;
import org.springframework.context.annotation.DependsOn;
@ -105,6 +107,10 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import org.springframework.stereotype.Component;
@ -51,9 +52,12 @@ public class DistroClientComponentRegistry {
private final ClusterRpcClientProxy clusterRpcClientProxy;
private final UpgradeJudgement upgradeJudgement;
public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,
DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder, GlobalConfig globalConfig,
ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy) {
ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy,
UpgradeJudgement upgradeJudgement) {
this.serverMemberManager = serverMemberManager;
this.distroProtocol = distroProtocol;
this.componentHolder = componentHolder;
@ -61,6 +65,7 @@ public class DistroClientComponentRegistry {
this.globalConfig = globalConfig;
this.clientManager = clientManager;
this.clusterRpcClientProxy = clusterRpcClientProxy;
this.upgradeJudgement = upgradeJudgement;
}
/**
@ -69,7 +74,8 @@ public class DistroClientComponentRegistry {
*/
@PostConstruct
public void doRegister() {
DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol);
DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol,
upgradeJudgement);
DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
serverMemberManager);
DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(globalConfig,

View File

@ -35,6 +35,7 @@ import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
@ -57,9 +58,13 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
private final DistroProtocol distroProtocol;
public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol) {
private final UpgradeJudgement upgradeJudgement;
public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol,
UpgradeJudgement upgradeJudgement) {
this.clientManager = clientManager;
this.distroProtocol = distroProtocol;
this.upgradeJudgement = upgradeJudgement;
NotifyCenter.registerSubscriber(this);
}
@ -76,6 +81,9 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
if (EnvUtil.getStandaloneMode()) {
return;
}
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
ClientEvent clientEvent = (ClientEvent) event;
Client client = clientEvent.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.

View File

@ -100,13 +100,19 @@ public class ClusterVersionJudgement {
for (ConsumerWithPriority consumer : observers) {
consumer.consumer.accept(true);
}
observers.clear();
}
public boolean allMemberIsNewVersion() {
return allMemberIsNewVersion;
}
/**
* Only used for upgrade to 2.0.0
*/
public void reset() {
allMemberIsNewVersion = false;
}
private static class ConsumerWithPriority implements Comparable<ConsumerWithPriority> {
private final Consumer<Boolean> consumer;

View File

@ -25,10 +25,12 @@ import com.alibaba.nacos.auth.annotation.Secured;
import com.alibaba.nacos.auth.common.ActionTypes;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.CatalogService;
import com.alibaba.nacos.naming.core.CatalogServiceV1Impl;
import com.alibaba.nacos.naming.core.CatalogServiceV2Impl;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.HealthCheckTask;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.NamingResourceParser;
@ -63,6 +65,9 @@ public class CatalogController {
@Autowired
private CatalogServiceV2Impl catalogServiceV2;
@Autowired
private UpgradeJudgement upgradeJudgement;
/**
* Get service detail.
*
@ -77,7 +82,7 @@ public class CatalogController {
String serviceName) throws NacosException {
String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName);
String groupName = NamingUtils.getGroupName(serviceName);
return catalogServiceV2.getServiceDetail(namespaceId, groupName, serviceNameWithoutGroup);
return judgeCatalogService().getServiceDetail(namespaceId, groupName, serviceNameWithoutGroup);
}
/**
@ -98,7 +103,7 @@ public class CatalogController {
@RequestParam int pageSize) throws NacosException {
String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName);
String groupName = NamingUtils.getGroupName(serviceName);
List<? extends Instance> instances = catalogServiceV2
List<? extends Instance> instances = judgeCatalogService()
.listInstances(namespaceId, groupName, serviceNameWithoutGroup, clusterName);
int start = (page - 1) * pageSize;
int end = page * pageSize;
@ -146,9 +151,9 @@ public class CatalogController {
@RequestParam(required = false) boolean hasIpCount) throws NacosException {
if (withInstances) {
return catalogServiceV2.pageListServiceDetail(namespaceId, groupName, serviceName, pageNo, pageSize);
return judgeCatalogService().pageListServiceDetail(namespaceId, groupName, serviceName, pageNo, pageSize);
}
return catalogServiceV2
return judgeCatalogService()
.pageListService(namespaceId, groupName, serviceName, pageNo, pageSize, containedInstance, hasIpCount);
}
@ -187,4 +192,8 @@ public class CatalogController {
result.replace("clusters", clusters);
return result;
}
private CatalogService judgeCatalogService() {
return upgradeJudgement.isUseGrpcFeatures() ? catalogServiceV2 : catalogServiceV1;
}
}

View File

@ -23,9 +23,11 @@ import com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckerFactory;
import com.alibaba.nacos.auth.annotation.Secured;
import com.alibaba.nacos.auth.common.ActionTypes;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.ClusterOperator;
import com.alibaba.nacos.naming.core.ClusterOperatorV1Impl;
import com.alibaba.nacos.naming.core.ClusterOperatorV2Impl;
import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.NamingResourceParser;
import org.apache.commons.lang3.BooleanUtils;
@ -46,11 +48,15 @@ import javax.servlet.http.HttpServletRequest;
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster")
public class ClusterController {
private final UpgradeJudgement upgradeJudgement;
private final ClusterOperatorV1Impl clusterOperatorV1;
private final ClusterOperatorV2Impl clusterOperatorV2;
public ClusterController(ClusterOperatorV1Impl clusterOperatorV1, ClusterOperatorV2Impl clusterOperatorV2) {
public ClusterController(UpgradeJudgement upgradeJudgement, ClusterOperatorV1Impl clusterOperatorV1,
ClusterOperatorV2Impl clusterOperatorV2) {
this.upgradeJudgement = upgradeJudgement;
this.clusterOperatorV1 = clusterOperatorV1;
this.clusterOperatorV2 = clusterOperatorV2;
}
@ -79,8 +85,11 @@ public class ClusterController {
clusterMetadata.setHealthyCheckType(healthChecker.getType());
clusterMetadata.setExtendData(
UtilsAndCommons.parseMetadata(WebUtils.optional(request, "metadata", StringUtils.EMPTY)));
clusterOperatorV2.updateClusterMetadata(namespaceId, serviceName, clusterName, clusterMetadata);
judgeClusterOperator().updateClusterMetadata(namespaceId, serviceName, clusterName, clusterMetadata);
return "ok";
}
private ClusterOperator judgeClusterOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? clusterOperatorV2 : clusterOperatorV1;
}
}

View File

@ -25,9 +25,12 @@ import com.alibaba.nacos.auth.common.ActionTypes;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.InstanceOperator;
import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl;
import com.alibaba.nacos.naming.core.InstanceOperatorServiceImpl;
import com.alibaba.nacos.naming.core.InstancePatchObject;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
@ -82,7 +85,13 @@ public class InstanceController {
private ServiceManager serviceManager;
@Autowired
private InstanceOperatorClientImpl instanceService;
private InstanceOperatorClientImpl instanceServiceV2;
@Autowired
private InstanceOperatorServiceImpl instanceServiceV1;
@Autowired
private UpgradeJudgement upgradeJudgement;
/**
* Register new instance.
@ -102,8 +111,8 @@ public class InstanceController {
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
instanceService.registerInstance(namespaceId, serviceName, instance);
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
@ -122,8 +131,8 @@ public class InstanceController {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
instanceService.removeInstance(namespaceId, serviceName, instance);
getInstanceOperator().removeInstance(namespaceId, serviceName, instance);
return "ok";
}
@ -141,7 +150,7 @@ public class InstanceController {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
instanceService.updateInstance(namespaceId, serviceName, parseInstance(request));
getInstanceOperator().updateInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
@ -305,7 +314,7 @@ public class InstanceController {
patchObject.setEnabled(BooleanUtils.toBoolean(enabledString));
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
instanceService.patchInstance(namespaceId, serviceName, patchObject);
getInstanceOperator().patchInstance(namespaceId, serviceName, patchObject);
return "ok";
}
@ -336,10 +345,9 @@ public class InstanceController {
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
Subscriber subscriber =
udpPort > 0 ? new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
udpPort, clusters) : null;
return instanceService.listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
udpPort, clusters);
return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}
/**
@ -360,7 +368,7 @@ public class InstanceController {
String ip = WebUtils.required(request, "ip");
int port = Integer.parseInt(WebUtils.required(request, "port"));
com.alibaba.nacos.api.naming.pojo.Instance instance = instanceService
com.alibaba.nacos.api.naming.pojo.Instance instance = getInstanceOperator()
.getInstance(namespaceId, serviceName, cluster, ip, port);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put("service", serviceName);
@ -413,10 +421,10 @@ public class InstanceController {
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
int resultCode = instanceService.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat);
int resultCode = getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
instanceService.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
@ -443,7 +451,7 @@ public class InstanceController {
}
NamingUtils.checkServiceNameFormat(serviceName);
List<? extends com.alibaba.nacos.api.naming.pojo.Instance> ips = instanceService
List<? extends com.alibaba.nacos.api.naming.pojo.Instance> ips = getInstanceOperator()
.listAllInstances(namespaceId, serviceName);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
@ -516,4 +524,8 @@ public class InstanceController {
return instance;
}
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}
}

View File

@ -32,10 +32,12 @@ import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.ServiceOperator;
import com.alibaba.nacos.naming.core.ServiceOperatorV1Impl;
import com.alibaba.nacos.naming.core.ServiceOperatorV2Impl;
import com.alibaba.nacos.naming.core.SubscribeManager;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
@ -93,6 +95,9 @@ public class ServiceController {
@Autowired
private ServiceOperatorV2Impl serviceOperatorV2;
@Autowired
private UpgradeJudgement upgradeJudgement;
/**
* Create a new service. This API will create a persistence service.
*
@ -115,7 +120,7 @@ public class ServiceController {
serviceMetadata.setSelector(parseSelector(selector));
serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata));
serviceMetadata.setEphemeral(false);
serviceOperatorV2.create(namespaceId, serviceName, serviceMetadata);
getServiceOperator().create(namespaceId, serviceName, serviceMetadata);
return "ok";
}
@ -131,8 +136,8 @@ public class ServiceController {
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String remove(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
@RequestParam String serviceName) throws Exception {
serviceOperatorV2.delete(namespaceId, serviceName);
getServiceOperator().delete(namespaceId, serviceName);
return "ok";
}
@ -193,7 +198,7 @@ public class ServiceController {
String groupName = WebUtils.optional(request, CommonParams.GROUP_NAME, Constants.DEFAULT_GROUP);
String selectorString = WebUtils.optional(request, "selector", StringUtils.EMPTY);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
List<String> serviceNameList = serviceOperatorV2
List<String> serviceNameList = getServiceOperator()
.listService(namespaceId, groupName, selectorString, pageSize, pageNo);
result.replace("doms", JacksonUtils.transferToJsonNode(serviceNameList));
result.put("count", serviceNameList.size());
@ -221,7 +226,7 @@ public class ServiceController {
com.alibaba.nacos.naming.core.v2.pojo.Service service = com.alibaba.nacos.naming.core.v2.pojo.Service
.newService(namespaceId, NamingUtils.getGroupName(serviceName),
NamingUtils.getServiceName(serviceName));
serviceOperatorV2.update(service, serviceMetadata);
getServiceOperator().update(service, serviceMetadata);
return "ok";
}
@ -457,4 +462,8 @@ public class ServiceController {
throw new NacosException(NacosException.INVALID_PARAM, "not match any type of selector!");
}
}
private ServiceOperator getServiceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? serviceOperatorV2 : serviceOperatorV1;
}
}

View File

@ -74,7 +74,7 @@ public class CatalogServiceV1Impl implements CatalogService {
@Override
public List<? extends Instance> listInstances(String namespaceId, String groupName, String serviceName,
String clusterName) throws NacosException {
Service service = serviceManager.getService(namespaceId, serviceName);
Service service = serviceManager.getService(namespaceId, NamingUtils.getGroupedName(serviceName, groupName));
if (service == null) {
throw new NacosException(NacosException.NOT_FOUND,
String.format("service %s@@%s is not found!", groupName, serviceName));

View File

@ -160,7 +160,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) {
Service service = getService(namespaceId, serviceName, true);
if (null != subscriber) {
if (subscriber.getPort() > 0) {
String clientId = IpPortBasedClient.getClientId(subscriber.getAddrStr(), true);
createIpPortClientIfAbsent(clientId, true);
clientOperationService.subscribeService(service, subscriber, clientId);

View File

@ -311,7 +311,7 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) {
com.alibaba.nacos.naming.core.Instance instance = serviceManager
.getInstance(namespaceId, serviceName, cluster, ip, port);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
if (null != instance && instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return instance.getInstanceHeartBeatInterval();
}
return switchDomain.getClientBeatInterval();

View File

@ -16,12 +16,13 @@
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
@ -32,10 +33,10 @@ import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.selector.NoneSelector;
import com.alibaba.nacos.naming.selector.Selector;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.ListUtils;
import org.apache.commons.lang3.StringUtils;
@ -52,8 +53,7 @@ import java.util.Map;
* Service of Nacos server side
*
* <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
* contain
* a list of instances.
* contain a list of instances.
*
* <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
*
@ -277,6 +277,9 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class)
.addTask(ServiceChangeV1Task.getKey(namespaceId, getName(), ephemeral),
new ServiceChangeV1Task(namespaceId, getName(), ephemeral));
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {

View File

@ -41,7 +41,7 @@ public class ClusterMetadata implements Serializable {
/**
* Whether or not use instance port to do health check.
*/
private boolean useInstancePortForCheck = false;
private boolean useInstancePortForCheck = true;
private Map<String, String> extendData = new ConcurrentHashMap<>(1);

View File

@ -0,0 +1,189 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.util.VersionUtil;
import org.springframework.stereotype.Component;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Ability judgement during upgrading.
*
* @author xiweng.yy
*/
@Component
public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
/**
* Only when all cluster upgrade upper 2.0.0, this features is true.
*/
private final AtomicBoolean useGrpcFeatures = new AtomicBoolean(false);
/**
* Only when all cluster upgrade upper 1.4.0, this features is true.
*/
private final AtomicBoolean useJraftFeatures = new AtomicBoolean(false);
private final RaftPeerSet raftPeerSet;
private final RaftCore raftCore;
private final ClusterVersionJudgement versionJudgement;
private final ServerMemberManager memberManager;
private final ServiceManager serviceManager;
private final DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine;
private final ScheduledExecutorService upgradeChecker;
public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersionJudgement versionJudgement,
ServerMemberManager memberManager, ServiceManager serviceManager,
DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine) {
this.raftPeerSet = raftPeerSet;
this.raftCore = raftCore;
this.versionJudgement = versionJudgement;
this.memberManager = memberManager;
this.serviceManager = serviceManager;
this.doubleWriteDelayTaskEngine = doubleWriteDelayTaskEngine;
upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker"));
upgradeChecker.scheduleAtFixedRate(() -> {
if (isUseGrpcFeatures()) {
return;
}
boolean canUpgrade = checkForUpgrade();
Loggers.SRV_LOG.info("upgrade check result {}", canUpgrade);
if (canUpgrade) {
doUpgrade();
}
}, 100L, 5000L, TimeUnit.MILLISECONDS);
NotifyCenter.registerSubscriber(this);
}
@JustForTest
void setUseGrpcFeatures(boolean value) {
useGrpcFeatures.set(value);
}
@JustForTest
void setUseJraftFeatures(boolean value) {
useJraftFeatures.set(value);
}
public boolean isUseGrpcFeatures() {
return useGrpcFeatures.get();
}
public boolean isUseJraftFeatures() {
return useJraftFeatures.get();
}
@Override
public void onEvent(MembersChangeEvent event) {
Loggers.SRV_LOG.info("member change, new members {}", event.getMembers());
for (Member each : event.getMembers()) {
Object versionStr = each.getExtendVal(MemberMetaDataConstants.VERSION);
// come from below 1.3.0
if (null == versionStr) {
checkAndDowngrade(false);
return;
}
Version version = VersionUtil.parseVersion(versionStr.toString());
if (version.getMajorVersion() < 2) {
checkAndDowngrade(version.getMinorVersion() >= 4);
return;
}
}
}
private void checkAndDowngrade(boolean jraftFeature) {
boolean isDowngradeGrpc = useGrpcFeatures.compareAndSet(true, false);
boolean isDowngradeJraft = useJraftFeatures.getAndSet(jraftFeature);
if (isDowngradeGrpc && isDowngradeJraft && !jraftFeature) {
Loggers.SRV_LOG.info("Downgrade to 1.X");
try {
raftPeerSet.init();
raftCore.init();
versionJudgement.reset();
} catch (Exception e) {
Loggers.SRV_LOG.error("Downgrade rafe failed ", e);
}
}
}
private boolean checkForUpgrade() {
if (!useGrpcFeatures.get()) {
boolean selfCheckResult = checkServiceAndInstanceNumber() && checkDoubleWriteStatus();
Member self = memberManager.getSelf();
self.setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, selfCheckResult);
memberManager.updateMember(self);
}
boolean result = true;
for (Member each : memberManager.allMembers()) {
Object isReadyToUpgrade = each.getExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE);
result &= null != isReadyToUpgrade && (boolean) isReadyToUpgrade;
}
return result;
}
private boolean checkServiceAndInstanceNumber() {
boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get();
result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get();
return result;
}
private boolean checkDoubleWriteStatus() {
return doubleWriteDelayTaskEngine.isEmpty();
}
private void doUpgrade() {
Loggers.SRV_LOG.info("Upgrade to 2.0.X");
useGrpcFeatures.compareAndSet(false, true);
useJraftFeatures.set(true);
}
@Override
public Class<? extends Event> subscribeType() {
return MembersChangeEvent.class;
}
public void shutdown() {
upgradeChecker.shutdownNow();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;
/**
* Double Write task engine.
*
* @author xiweng.yy
*/
@Component
public class DoubleWriteDelayTaskEngine extends NacosDelayTaskExecuteEngine {
public DoubleWriteDelayTaskEngine() {
super(DoubleWriteDelayTaskEngine.class.getSimpleName(), Loggers.SRV_LOG);
addProcessor("v1", new ServiceChangeV1Task.ServiceChangeV1TaskProcessor());
}
@Override
public NacosTaskProcessor getProcessor(Object key) {
String actualKey = key.toString().split(":")[0];
return super.getProcessor(actualKey);
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteInstanceChangeToV2Task;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.DoubleWriteMetadataChangeToV2Task;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Double write delay task during upgrading.
*
* @author xiweng.yy
*/
public class ServiceChangeV1Task extends AbstractDelayTask {
private final String namespace;
private final String serviceName;
private final boolean ephemeral;
public ServiceChangeV1Task(String namespace, String serviceName, boolean ephemeral) {
this.namespace = namespace;
this.serviceName = serviceName;
this.ephemeral = ephemeral;
setLastProcessTime(System.currentTimeMillis());
setTaskInterval(1000L);
}
@Override
public void merge(AbstractDelayTask task) {
}
public String getNamespace() {
return namespace;
}
public String getServiceName() {
return serviceName;
}
public boolean isEphemeral() {
return ephemeral;
}
public static String getKey(String namespace, String serviceName, boolean ephemeral) {
return "v1:" + namespace + "_" + serviceName + "_" + ephemeral;
}
public static class ServiceChangeV1TaskProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
ServiceChangeV1Task serviceTask = (ServiceChangeV1Task) task;
Loggers.SRV_LOG.info("double write for service {}", serviceTask.getServiceName());
ServiceManager serviceManager = ApplicationUtils.getBean(ServiceManager.class);
Service service = serviceManager.getService(serviceTask.getNamespace(), serviceTask.getServiceName());
ServiceMetadata serviceMetadata = parseServiceMetadata(service, serviceTask.isEphemeral());
DoubleWriteMetadataChangeToV2Task metadataTask = new DoubleWriteMetadataChangeToV2Task(
service.getNamespaceId(), service.getName(), serviceTask.isEphemeral(), serviceMetadata);
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service.getName(), metadataTask);
ServiceStorage serviceStorage = ApplicationUtils.getBean(ServiceStorage.class);
ServiceInfo serviceInfo = serviceStorage.getPushData(transfer(service, serviceTask.isEphemeral()));
List<Instance> newInstance = service.allIPs(serviceTask.isEphemeral());
Set<String> instances = new HashSet<>();
for (Instance each : newInstance) {
instances.add(each.toIpAddr());
DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task(
service.getNamespaceId(), service.getName(), each, true);
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(
IpPortBasedClient.getClientId(each.toIpAddr(), serviceTask.isEphemeral()), instanceTask);
}
List<com.alibaba.nacos.api.naming.pojo.Instance> oldInstance = serviceInfo.getHosts();
Loggers.SRV_LOG.info("[TMP_DEBUG] oldInstances {} \n new Instances {}", oldInstance, newInstance);
for (com.alibaba.nacos.api.naming.pojo.Instance each : oldInstance) {
if (!instances.contains(each.toInetAddr())) {
DoubleWriteInstanceChangeToV2Task instanceTask = new DoubleWriteInstanceChangeToV2Task(
service.getNamespaceId(), service.getName(), each, false);
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(
IpPortBasedClient.getClientId(each.toInetAddr(), serviceTask.isEphemeral()), instanceTask);
}
}
return true;
}
private com.alibaba.nacos.naming.core.v2.pojo.Service transfer(Service service, boolean ephemeral) {
return com.alibaba.nacos.naming.core.v2.pojo.Service
.newService(service.getNamespaceId(), service.getGroupName(),
NamingUtils.getServiceName(service.getName()), ephemeral);
}
private ServiceMetadata parseServiceMetadata(Service service, boolean ephemeral) {
ServiceMetadata result = new ServiceMetadata();
result.setEphemeral(ephemeral);
result.setProtectThreshold(service.getProtectThreshold());
result.setSelector(service.getSelector());
result.setExtendData(service.getMetadata());
for (Map.Entry<String, Cluster> entry : service.getClusterMap().entrySet()) {
result.getClusters().put(entry.getKey(), parseClusterMetadata(entry.getValue()));
}
return result;
}
private ClusterMetadata parseClusterMetadata(Cluster cluster) {
ClusterMetadata result = new ClusterMetadata();
result.setHealthyCheckPort(cluster.getDefCkport());
result.setUseInstancePortForCheck(cluster.isUseIPPort4Check());
result.setExtendData(cluster.getMetadata());
result.setHealthChecker(cluster.getHealthChecker());
result.setHealthyCheckType(cluster.getHealthChecker().getType());
return result;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
/**
* Double write from 1.x to 2 task.
*
* @author xiweng.yy
*/
public class DoubleWriteInstanceChangeToV2Task extends AbstractExecuteTask {
private final String namespace;
private final String serviceName;
private final Instance instance;
private final boolean register;
public DoubleWriteInstanceChangeToV2Task(String namespace, String serviceName, Instance instance,
boolean register) {
this.register = register;
this.namespace = namespace;
this.serviceName = serviceName;
this.instance = instance;
}
@Override
public void run() {
try {
InstanceOperatorClientImpl instanceOperator = ApplicationUtils.getBean(InstanceOperatorClientImpl.class);
if (register) {
instanceOperator.registerInstance(namespace, serviceName, instance);
} else {
instanceOperator.removeInstance(namespace, serviceName, instance);
}
} catch (Exception e) {
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(namespace, serviceName, instance.isEphemeral());
retryTask.setTaskInterval(3000L);
String taskKey = ServiceChangeV1Task.getKey(namespace, serviceName, instance.isEphemeral());
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute;
import com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckType;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.naming.core.v2.metadata.ClusterMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.ServiceChangeV1Task;
import com.alibaba.nacos.naming.selector.NoneSelector;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.util.Map;
/**
* Double write from 1.x to 2 task.
*
* @author xiweng.yy
*/
public class DoubleWriteMetadataChangeToV2Task extends AbstractExecuteTask {
private final Service service;
private final ServiceMetadata serviceMetadata;
public DoubleWriteMetadataChangeToV2Task(String namespace, String serviceName, boolean ephemeral,
ServiceMetadata serviceMetadata) {
this.serviceMetadata = serviceMetadata;
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName);
this.service = Service.newService(namespace, groupName, serviceNameWithoutGroup, ephemeral);
}
@Override
public void run() {
try {
NamingMetadataOperateService metadataOperate = ApplicationUtils.getBean(NamingMetadataOperateService.class);
if (!isDefaultServiceMetadata()) {
metadataOperate.updateServiceMetadata(service, serviceMetadata);
}
for (Map.Entry<String, ClusterMetadata> entry : serviceMetadata.getClusters().entrySet()) {
if (!isDefaultClusterMetadata(entry.getValue())) {
metadataOperate.addClusterMetadata(service, entry.getKey(), entry.getValue());
}
}
} catch (Exception e) {
ServiceChangeV1Task retryTask = new ServiceChangeV1Task(service.getNamespace(),
service.getGroupedServiceName(), service.isEphemeral());
retryTask.setTaskInterval(3000L);
String taskKey = ServiceChangeV1Task
.getKey(service.getNamespace(), service.getGroupedServiceName(), service.isEphemeral());
ApplicationUtils.getBean(DoubleWriteDelayTaskEngine.class).addTask(taskKey, retryTask);
}
}
private boolean isDefaultServiceMetadata() {
return serviceMetadata.getExtendData().isEmpty() && serviceMetadata.getProtectThreshold() == 0.0F
&& serviceMetadata.getSelector() instanceof NoneSelector && serviceMetadata.isEphemeral();
}
private boolean isDefaultClusterMetadata(ClusterMetadata metadata) {
return HealthCheckType.TCP.name().equals(metadata.getHealthyCheckType()) && metadata.getExtendData().isEmpty()
&& metadata.getHealthyCheckPort() == 80 && metadata.isUseInstancePortForCheck();
}
}

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.heartbeat.BeatCheckTask;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.HttpClient;
@ -77,6 +78,10 @@ public class ClientBeatCheckTask implements BeatCheckTask {
@Override
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
if (!getDistroMapper().responsible(service.getName())) {
return;
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.DistroMapper;
@ -77,6 +78,10 @@ public class HealthCheckTask implements Runnable {
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
.isHealthCheckEnabled(cluster.getService().getName())) {
healthCheckProcessor.process(this);

View File

@ -81,7 +81,11 @@ public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCh
}
@Override
public void afterIntercept() {
public void passIntercept() {
doHealthCheck();
}
@Override
public void afterIntercept() {
}
}

View File

@ -19,6 +19,7 @@ package com.alibaba.nacos.naming.healthcheck.heartbeat;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.pojo.HealthCheckInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
@ -66,6 +67,7 @@ public class ClientBeatProcessorV2 implements BeatProcessor {
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
}

View File

@ -53,12 +53,16 @@ public class InstanceBeatCheckTask implements Interceptable {
}
@Override
public void afterIntercept() {
public void passIntercept() {
for (InstanceBeatChecker each : CHECKERS) {
each.doCheck(client, service, instancePublishInfo);
}
}
@Override
public void afterIntercept() {
}
public Client getClient() {
return client;
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.healthcheck.interceptor;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.NacosHealthCheckTask;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
@ -29,7 +30,12 @@ public class HealthCheckEnableInterceptor extends AbstractHealthCheckInterceptor
@Override
public boolean intercept(NacosHealthCheckTask object) {
return !ApplicationUtils.getBean(SwitchDomain.class).isHealthCheckEnabled();
try {
return !ApplicationUtils.getBean(SwitchDomain.class).isHealthCheckEnabled() || !ApplicationUtils
.getBean(UpgradeJudgement.class).isUseGrpcFeatures();
} catch (Exception e) {
return true;
}
}
@Override

View File

@ -127,10 +127,17 @@ public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealt
}
@Override
public void afterIntercept() {
public void passIntercept() {
doHealthCheck();
}
@Override
public void afterIntercept() {
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
}
}
@Override
public void run() {
doHealthCheck();

View File

@ -96,7 +96,7 @@ public class HealthCheckCommonV2 {
String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString();
if (instance.getOkCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper
.responsible(serviceName)) {
.responsible(task.getClient().getResponsibleId())) {
healthStatusSynchronizer.instanceHealthStatusChange(true, task.getClient(), service, instance);
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
serviceName, instance.getIp(), instance.getPort(), clusterName,
@ -130,7 +130,7 @@ public class HealthCheckCommonV2 {
String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString();
if (instance.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper
.responsible(serviceName)) {
.responsible(task.getClient().getResponsibleId())) {
healthStatusSynchronizer.instanceHealthStatusChange(false, task.getClient(), service, instance);
Loggers.EVT_LOG
.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
@ -165,7 +165,7 @@ public class HealthCheckCommonV2 {
String serviceName = service.getGroupedServiceName();
String clusterName = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString();
if (switchDomain.isHealthCheckEnabled(serviceName) && !task.isCancelled() && distroMapper
.responsible(serviceName)) {
.responsible(task.getClient().getResponsibleId())) {
healthStatusSynchronizer.instanceHealthStatusChange(false, task.getClient(), service, instance);
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
serviceName, instance.getIp(), instance.getPort(), clusterName,

View File

@ -279,7 +279,7 @@ public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable
healthCheckCommon.checkFail(task, service, msg);
}
keyMap.remove(task.toString());
keyMap.remove(toString());
}
healthCheckCommon.reEvaluateCheckRT(rt, task, switchDomain.getTcpHealthParams());

View File

@ -60,9 +60,10 @@ public abstract class AbstractNamingInterceptorChain<T extends Interceptable>
continue;
}
if (each.intercept(object)) {
object.afterIntercept();
return;
}
}
object.afterIntercept();
object.passIntercept();
}
}

View File

@ -26,5 +26,10 @@ public interface Interceptable {
/**
* If no {@link NacosNamingInterceptor} intercept this object, this method will be called to execute.
*/
void passIntercept();
/**
* If one {@link NacosNamingInterceptor} intercept this object, this method will be called.
*/
void afterIntercept();
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
@ -34,6 +35,7 @@ import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.naming.remote.udp.AckPacket;
import com.alibaba.nacos.naming.remote.udp.UdpConnector;
import com.alibaba.nacos.naming.utils.Constants;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.util.VersionUtil;
import org.springframework.beans.BeansException;
@ -114,6 +116,10 @@ public class UdpPushService implements ApplicationContextAware, ApplicationListe
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
// If upgrade to 2.0.X, do not push for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.NamingSubscriberService;
import com.alibaba.nacos.naming.push.v2.executor.PushExecutorDelegate;
@ -54,11 +55,14 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
private final PushDelayTaskExecuteEngine delayTaskEngine;
private final UpgradeJudgement upgradeJudgement;
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
PushExecutorDelegate pushExecutor) {
PushExecutorDelegate pushExecutor, UpgradeJudgement upgradeJudgement) {
this.clientManager = clientManager;
this.indexesManager = indexesManager;
this.upgradeJudgement = upgradeJudgement;
this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
pushExecutor);
NotifyCenter.registerSubscriber(this);
@ -106,6 +110,9 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, 500L));

View File

@ -92,46 +92,49 @@ public class DistroFilter implements Filter {
throw new NoSuchMethodException(req.getMethod() + " " + path);
}
if (!method.isAnnotationPresent(CanDistro.class)) {
filterChain.doFilter(req, resp);
return;
}
String distroTag = distroTagGenerator.getResponsibleTag(req);
// proxy request to other server if necessary:
if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(distroTag)) {
String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);
if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
// This request is sent from peer server, should not be redirected again:
Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"receive invalid redirect request from peer " + req.getRemoteAddr());
return;
}
final String targetServer = distroMapper.mapSrv(distroTag);
List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
String headerName = headers.nextElement();
headerList.add(headerName);
headerList.add(req.getHeader(headerName));
}
final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
RestResult<String> result = HttpClient
.request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,
PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG
.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
}
} else {
if (distroMapper.responsible(distroTag)) {
filterChain.doFilter(req, resp);
return;
}
// proxy request to other server if necessary:
String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);
if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
// This request is sent from peer server, should not be redirected again:
Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"receive invalid redirect request from peer " + req.getRemoteAddr());
return;
}
final String targetServer = distroMapper.mapSrv(distroTag);
List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
String headerName = headers.nextElement();
headerList.add(headerName);
headerList.add(req.getHeader(headerName));
}
final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
RestResult<String> result = HttpClient
.request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,
PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
}
} catch (AccessControlException e) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
@ -147,6 +150,6 @@ public class DistroFilter implements Filter {
@Override
public void destroy() {
}
}

View File

@ -16,10 +16,8 @@
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import org.springframework.stereotype.Component;
/**
@ -30,14 +28,14 @@ import org.springframework.stereotype.Component;
@Component
public class DistroTagGeneratorImpl implements DistroTagGenerator {
private final ServerMemberManager serverMemberManager;
private final DistroTagGenerator serviceNameTag = new DistroServiceNameTagGenerator();
private final DistroTagGenerator ipPortTag = new DistroIpPortTagGenerator();
public DistroTagGeneratorImpl(ServerMemberManager serverMemberManager) {
this.serverMemberManager = serverMemberManager;
private final UpgradeJudgement upgradeJudgement;
public DistroTagGeneratorImpl(UpgradeJudgement upgradeJudgement) {
this.upgradeJudgement = upgradeJudgement;
}
@Override
@ -56,11 +54,6 @@ public class DistroTagGeneratorImpl implements DistroTagGenerator {
* @return actual tag generator
*/
private DistroTagGenerator getTagGenerator() {
for (Member each : serverMemberManager.allMembers()) {
if (null == each.getExtendVal(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE)) {
return serviceNameTag;
}
}
return ipPortTag;
return upgradeJudgement.isUseGrpcFeatures() ? ipPortTag : serviceNameTag;
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class UpgradeJudgementTest {
@Mock
private ConfigurableEnvironment environment;
@Mock
private RaftPeerSet raftPeerSet;
@Mock
private RaftCore raftCore;
@Mock
private ClusterVersionJudgement versionJudgement;
@Mock
private ServerMemberManager memberManager;
@Mock
private ServiceManager serviceManager;
@Mock
private DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine;
private UpgradeJudgement upgradeJudgement;
@Before
public void setUp() throws Exception {
EnvUtil.setEnvironment(environment);
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
doubleWriteDelayTaskEngine);
}
@After
public void tearDown() {
upgradeJudgement.shutdown();
}
@Test
public void testUpgradeOneNode() throws Exception {
Collection<Member> members = mockMember("1.3.2", "1.3.2", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeOneFor14XNode() throws Exception {
Collection<Member> members = mockMember("1.4.0", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeTwoNode() throws Exception {
Collection<Member> members = mockMember("", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckSucc() throws Exception {
Collection<Member> members = mockMember("2.0.0-snapshot", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true);
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertTrue(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckSelfFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(false);
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckOthersFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true);
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeOneFor14XNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("1.4.0", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeTwoNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("", "", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet).init();
verify(raftCore).init();
verify(versionJudgement).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeOneNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("1.3.2", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().members(members).build());
verify(raftPeerSet).init();
verify(raftCore).init();
verify(versionJudgement).reset();
TimeUnit.MILLISECONDS.sleep(500);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
private Collection<Member> mockMember(String... versions) {
Collection<Member> result = new HashSet<>();
for (int i = 0; i < versions.length; i++) {
Member member = new Member();
member.setPort(i);
if (StringUtils.isNotBlank(versions[i])) {
member.setExtendVal(MemberMetaDataConstants.VERSION, versions[i]);
}
result.add(member);
}
when(memberManager.getSelf()).thenReturn(result.iterator().next());
when(memberManager.allMembers()).thenReturn(result);
return result;
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.task.PushDelayTask;
import com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine;
@ -62,11 +63,15 @@ public class NamingSubscriberServiceV2ImplTest {
@Mock
private Client client;
@Mock
private UpgradeJudgement upgradeJudgement;
private NamingSubscriberServiceV2Impl subscriberService;
@Before
public void setUp() throws Exception {
subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null);
subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null,
upgradeJudgement);
ReflectionTestUtils.setField(subscriberService, "delayTaskEngine", delayTaskEngine);
when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(testClientId));
when(indexesManager.getAllClientsSubscribeService(service1))