Remove old raft classes

This commit is contained in:
KomachiSion 2022-08-26 14:45:10 +08:00
parent 6f58ebf0c2
commit 49e12c8b8e
33 changed files with 40 additions and 4147 deletions

View File

@ -18,31 +18,25 @@ package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Message;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.ServerStatusSynchronizer;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.Synchronizer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -76,7 +70,6 @@ public class ServerListManager extends MemberChangeListener {
@PostConstruct
public void init() {
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
}
/**
@ -124,11 +117,10 @@ public class ServerListManager extends MemberChangeListener {
Loggers.SRV_LOG.warn("received malformed distro map data: {}", config);
continue;
}
String[] info = InternetAddressUtil.splitIPPortStr(params[1]);
Member server = Optional.ofNullable(memberManager.find(params[1]))
.orElse(Member.builder().ip(info[0]).state(NodeState.UP)
.port(Integer.parseInt(info[1])).build());
.orElse(Member.builder().ip(info[0]).state(NodeState.UP).port(Integer.parseInt(info[1])).build());
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) == null) {
@ -150,51 +142,6 @@ public class ServerListManager extends MemberChangeListener {
}
}
private class ServerInfoUpdater implements Runnable {
private int cursor = 0;
@Override
public void run() {
List<Member> members = servers;
if (members.isEmpty()) {
return;
}
this.cursor = (this.cursor + 1) % members.size();
Member target = members.get(cursor);
if (Objects.equals(target.getAddress(), EnvUtil.getLocalAddress())) {
return;
}
// This metadata information exists from 1.3.0 onwards "version"
if (target.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
return;
}
final String path =
UtilsAndCommons.NACOS_NAMING_OPERATOR_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT
+ "/state";
final Map<String, String> params = new HashMap(2);
final String server = target.getAddress();
try {
String content = NamingProxy.reqCommon(path, params, server, false);
if (!StringUtils.EMPTY.equals(content)) {
RaftPeer raftPeer = JacksonUtils.toObj(content, RaftPeer.class);
if (null != raftPeer) {
String json = JacksonUtils.toJson(raftPeer);
Map map = JacksonUtils.toObj(json, HashMap.class);
target.setExtendVal("naming", map);
memberManager.update(target);
}
}
} catch (Exception ignore) {
//
}
}
}
private class ServerStatusReporter implements Runnable {
@Override
@ -211,19 +158,19 @@ public class ServerListManager extends MemberChangeListener {
}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
+ "\r\n";
String status =
LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight + "\r\n";
List<Member> allServers = getServers();
if (!contains(EnvUtil.getLocalAddress())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
EnvUtil.getLocalAddress(), allServers);
Loggers.SRV_LOG
.error("local ip is not in serverlist, ip: {}, serverlist: {}", EnvUtil.getLocalAddress(),
allServers);
return;
}
if (allServers.size() > 0 && !EnvUtil.getLocalAddress()
.contains(InternetAddressUtil.localHostIP())) {
if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(InternetAddressUtil.localHostIP())) {
for (Member server : allServers) {
if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
continue;

View File

@ -23,7 +23,6 @@ import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.impl.StandalonePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.stereotype.Component;
@ -38,29 +37,10 @@ import java.util.Optional;
@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {
private final ClusterVersionJudgement versionJudgement;
private final RaftConsistencyServiceImpl oldPersistentConsistencyService;
private final BasePersistentServiceProcessor newPersistentConsistencyService;
private volatile boolean switchNewPersistentService = false;
public PersistentConsistencyServiceDelegateImpl(ClusterVersionJudgement versionJudgement,
RaftConsistencyServiceImpl oldPersistentConsistencyService, ProtocolManager protocolManager)
throws Exception {
this.versionJudgement = versionJudgement;
this.oldPersistentConsistencyService = oldPersistentConsistencyService;
this.newPersistentConsistencyService = createNewPersistentServiceProcessor(protocolManager, versionJudgement);
init();
}
private void init() {
if (EnvUtil.isSupportUpgradeFrom1X()) {
this.versionJudgement.registerObserver(isAllNewVersion -> switchNewPersistentService = isAllNewVersion, -1);
return;
}
this.switchNewPersistentService = true;
public PersistentConsistencyServiceDelegateImpl(ProtocolManager protocolManager) throws Exception {
this.newPersistentConsistencyService = createNewPersistentServiceProcessor(protocolManager);
}
@Override
@ -80,14 +60,12 @@ public class PersistentConsistencyServiceDelegateImpl implements PersistentConsi
@Override
public void listen(String key, RecordListener listener) throws NacosException {
oldPersistentConsistencyService.listen(key, listener);
newPersistentConsistencyService.listen(key, listener);
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
newPersistentConsistencyService.unListen(key, listener);
oldPersistentConsistencyService.unListen(key, listener);
}
@Override
@ -101,14 +79,14 @@ public class PersistentConsistencyServiceDelegateImpl implements PersistentConsi
}
private PersistentConsistencyService switchOne() {
return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;
return newPersistentConsistencyService;
}
private BasePersistentServiceProcessor createNewPersistentServiceProcessor(ProtocolManager protocolManager,
ClusterVersionJudgement versionJudgement) throws Exception {
private BasePersistentServiceProcessor createNewPersistentServiceProcessor(ProtocolManager protocolManager)
throws Exception {
final BasePersistentServiceProcessor processor =
EnvUtil.getStandaloneMode() ? new StandalonePersistentServiceProcessor(versionJudgement)
: new PersistentServiceProcessor(protocolManager, versionJudgement);
EnvUtil.getStandaloneMode() ? new StandalonePersistentServiceProcessor()
: new PersistentServiceProcessor(protocolManager);
processor.afterConstruct();
return processor;
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.TypeUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
@ -34,15 +35,13 @@ import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.constants.Constants;
import com.google.protobuf.ByteString;
import com.alibaba.nacos.common.utils.TypeUtils;
import java.lang.reflect.Type;
import java.nio.file.Paths;
@ -106,16 +105,13 @@ public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP
protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
protected final ClusterVersionJudgement versionJudgement;
protected final PersistentNotifier notifier;
protected final int queueMaxSize = 16384;
protected final int priority = 10;
public BasePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception {
this.versionJudgement = judgement;
public BasePersistentServiceProcessor() throws Exception {
this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> {
@ -132,16 +128,6 @@ public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP
@SuppressWarnings("unchecked")
public void afterConstruct() {
NotifyCenter.registerToPublisher(ValueChangeEvent.class, queueMaxSize);
listenOldRaftClose();
}
private void listenOldRaftClose() {
this.versionJudgement.registerObserver(isNewVersion -> {
if (isNewVersion) {
NotifyCenter.registerSubscriber(notifier);
startNotify = true;
}
}, priority);
}
@Override

View File

@ -30,10 +30,9 @@ import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.google.protobuf.ByteString;
@ -58,9 +57,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
*/
private volatile boolean hasLeader = false;
public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersionJudgement versionJudgement)
throws Exception {
super(versionJudgement);
public PersistentServiceProcessor(ProtocolManager protocolManager) throws Exception {
this.protocol = protocolManager.getCpProtocol();
}

View File

@ -24,9 +24,8 @@ import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.pojo.Record;
import com.google.protobuf.ByteString;
import java.util.Collections;
@ -41,8 +40,7 @@ import java.util.Optional;
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class StandalonePersistentServiceProcessor extends BasePersistentServiceProcessor {
public StandalonePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception {
super(judgement);
public StandalonePersistentServiceProcessor() throws Exception {
}
@Override

View File

@ -1,48 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import org.springframework.context.ApplicationEvent;
/**
* Base raft event.
*
* @deprecated will remove in 1.4.x
* @author pbting
* @date 2019-07-01 8:46 PM
*/
@Deprecated
public abstract class BaseRaftEvent extends ApplicationEvent {
private final RaftPeer raftPeer;
private final RaftPeer local;
public BaseRaftEvent(Object source, RaftPeer raftPeer, RaftPeer local) {
super(source);
this.raftPeer = raftPeer;
this.local = local;
}
public RaftPeer getRaftPeer() {
return raftPeer;
}
public RaftPeer getLocal() {
return local;
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
/**
* Leader election finished event.
*
* @deprecated will remove in 1.4.x
* @author pbting
* @date 2019-07-01 8:25 PM
*/
@Deprecated
public class LeaderElectFinishedEvent extends BaseRaftEvent {
public LeaderElectFinishedEvent(Object source, RaftPeer raftPeer, RaftPeer local) {
super(source, raftPeer, local);
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
/**
* Make leader event.
*
* @deprecated will remove in 1.4.x
* @author pbting
* @date 2019-07-01 8:45 PM
*/
@Deprecated
public class MakeLeaderEvent extends BaseRaftEvent {
public MakeLeaderEvent(Object source, RaftPeer raftPeer, RaftPeer local) {
super(source, raftPeer, local);
}
}

View File

@ -1,183 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Optional;
/**
* Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
*
* @author nkorange
* @since 1.0.0
* @deprecated will remove in 1.4.x
*/
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
private final RaftCore raftCore;
private final RaftPeerSet peers;
private final SwitchDomain switchDomain;
private volatile boolean stopWork = false;
private String errorMsg;
public RaftConsistencyServiceImpl(ClusterVersionJudgement versionJudgement, RaftCore raftCore,
SwitchDomain switchDomain) {
this.raftCore = raftCore;
this.peers = raftCore.getPeerSet();
this.switchDomain = switchDomain;
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
this.raftCore.shutdown();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 1000);
}
@PostConstruct
protected void init() throws Exception {
if (EnvUtil.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) {
this.raftCore.shutdown();
}
}
@Override
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
@Override
public void remove(String key) throws NacosException {
checkIsStopWork();
try {
if (KeyBuilder.matchInstanceListKey(key) && !raftCore.isLeader()) {
raftCore.onDelete(key, peers.getLeader());
} else {
raftCore.signalDelete(key);
}
raftCore.unListenAll(key);
} catch (Exception e) {
Loggers.RAFT.error("Raft remove failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft remove failed, key:" + key, e);
}
}
@Override
public Datum get(String key) throws NacosException {
checkIsStopWork();
return raftCore.getDatum(key);
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
raftCore.listen(key, listener);
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
raftCore.unListen(key, listener);
}
@Override
public boolean isAvailable() {
return raftCore.isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
}
@Override
public Optional<String> getErrorMsg() {
String errorMsg;
if (!raftCore.isInitialized() && !ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus())) {
errorMsg = "The old raft protocol node is not initialized";
} else {
errorMsg = null;
}
return Optional.ofNullable(errorMsg);
}
/**
* Put a new datum from other server.
*
* @param datum datum
* @param source source server
* @throws NacosException nacos exception
*/
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onPut failed.", e);
throw new NacosException(NacosException.SERVER_ERROR,
"Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}
/**
* Remove a new datum from other server.
*
* @param datum datum
* @param source source server
* @throws NacosException nacos exception
*/
public void onRemove(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onDelete(datum.key, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onRemove failed.", e);
throw new NacosException(NacosException.SERVER_ERROR,
"Raft onRemove failed, datum:" + datum + ", source: " + source, e);
}
}
private void checkIsStopWork() {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
}
}

View File

@ -1,113 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.constants.Constants;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Inject the raft information from the naming module into the outlier information of the node.
*
* @deprecated will remove in 1.4.x
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Deprecated
@Component
public class RaftListener implements SmartApplicationListener {
private final ServerMemberManager memberManager;
private final ClusterVersionJudgement versionJudgement;
private volatile boolean stopUpdate = false;
/**
* Avoid multithreading mode. Old Raft information data cannot be properly removed.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public RaftListener(ServerMemberManager memberManager, ClusterVersionJudgement versionJudgement) {
this.memberManager = memberManager;
this.versionJudgement = versionJudgement;
this.init();
}
private void init() {
this.versionJudgement.registerObserver(isAllNewVersion -> {
final Lock lock = this.writeLock;
lock.lock();
try {
stopUpdate = isAllNewVersion;
if (stopUpdate) {
removeOldRaftMetadata();
}
} finally {
lock.unlock();
}
}, -2);
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return BaseRaftEvent.class.isAssignableFrom(eventType);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
final Lock lock = readLock;
lock.lock();
try {
if (event instanceof BaseRaftEvent && !stopUpdate) {
BaseRaftEvent raftEvent = (BaseRaftEvent) event;
RaftPeer local = raftEvent.getLocal();
String json = JacksonUtils.toJson(local);
Map map = JacksonUtils.toObj(json, HashMap.class);
Member self = memberManager.getSelf();
self.setExtendVal(Constants.OLD_NAMING_RAFT_GROUP, map);
memberManager.update(self);
}
if (stopUpdate) {
removeOldRaftMetadata();
}
} finally {
lock.unlock();
}
}
void removeOldRaftMetadata() {
Loggers.RAFT.warn("start to move old raft protocol metadata");
Member self = memberManager.getSelf();
self.delExtendVal(Constants.OLD_NAMING_RAFT_GROUP);
memberManager.update(self);
}
}

View File

@ -1,95 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.common.utils.RandomUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
* Raft peer.
*
* @deprecated will remove in 1.4.x
* @author nacos
*/
@Deprecated
public class RaftPeer {
public String ip;
public String voteFor;
public AtomicLong term = new AtomicLong(0L);
public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS);
public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
public volatile State state = State.FOLLOWER;
public void resetLeaderDue() {
leaderDueMs = GlobalExecutor.LEADER_TIMEOUT_MS + RandomUtils.nextLong(0, GlobalExecutor.RANDOM_MS);
}
public void resetHeartbeatDue() {
heartbeatDueMs = GlobalExecutor.HEARTBEAT_INTERVAL_MS;
}
public enum State {
/**
* Leader of the cluster, only one leader stands in a cluster.
*/
LEADER,
/**
* Follower of the cluster, report to and copy from leader.
*/
FOLLOWER,
/**
* Candidate leader to be elected.
*/
CANDIDATE
}
@Override
public int hashCode() {
return Objects.hash(ip);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof RaftPeer)) {
return false;
}
RaftPeer other = (RaftPeer) obj;
return StringUtils.equals(ip, other.ip);
}
@Override
public String toString() {
return "RaftPeer{" + "ip='" + ip + '\'' + ", voteFor='" + voteFor + '\'' + ", term=" + term + ", leaderDueMs="
+ leaderDueMs + ", heartbeatDueMs=" + heartbeatDueMs + ", state=" + state + '}';
}
}

View File

@ -1,379 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.apache.commons.collections.SortedBag;
import org.apache.commons.collections.bag.TreeBag;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* Sets of raft peers.
*
* @author nacos
* @deprecated will remove in 1.4.x
*/
@Deprecated
@Component
@DependsOn("ProtocolManager")
public class RaftPeerSet extends MemberChangeListener implements Closeable {
private final ServerMemberManager memberManager;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private volatile Map<String, RaftPeer> peers = new HashMap<>(8);
private Set<String> sites = new HashSet<>();
private volatile boolean ready = false;
private Set<Member> oldMembers = new HashSet<>();
public RaftPeerSet(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
/**
* Init method.
*/
@PostConstruct
public void init() {
if (!EnvUtil.isSupportUpgradeFrom1X()) {
return;
}
NotifyCenter.registerSubscriber(this);
changePeers(memberManager.allMembers());
}
@Override
public void shutdown() throws NacosException {
this.localTerm.set(-1);
this.leader = null;
this.peers.clear();
this.sites.clear();
this.ready = false;
this.oldMembers.clear();
}
public RaftPeer getLeader() {
if (EnvUtil.getStandaloneMode()) {
return local();
}
return leader;
}
public Set<String> allSites() {
return sites;
}
public boolean isReady() {
return ready;
}
/**
* Remove raft node.
*
* @param servers node address need to be removed
*/
public void remove(List<String> servers) {
for (String server : servers) {
peers.remove(server);
}
}
/**
* Update raft peer.
*
* @param peer new peer.
* @return new peer
*/
public RaftPeer update(RaftPeer peer) {
peers.put(peer.ip, peer);
return peer;
}
/**
* Judge whether input address is leader.
*
* @param ip peer address
* @return true if is leader or stand alone, otherwise false
*/
public boolean isLeader(String ip) {
if (EnvUtil.getStandaloneMode()) {
return true;
}
if (leader == null) {
Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
return false;
}
return StringUtils.equals(leader.ip, ip);
}
public Set<String> allServersIncludeMyself() {
return peers.keySet();
}
/**
* Get all servers excludes current peer.
*
* @return all servers excludes current peer
*/
public Set<String> allServersWithoutMySelf() {
Set<String> servers = new HashSet<String>(peers.keySet());
// exclude myself
servers.remove(local().ip);
return servers;
}
public Collection<RaftPeer> allPeers() {
return peers.values();
}
public int size() {
return peers.size();
}
/**
* Calculate and decide which peer is leader. If has new peer has more than half vote, change leader to new peer.
*
* @param candidate new candidate
* @return new leader if new candidate has more than half vote, otherwise old leader
*/
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
/**
* Set leader as new candidate.
*
* @param candidate new candidate
* @return new leader
*/
public RaftPeer makeLeader(RaftPeer candidate) {
if (!Objects.equals(leader, candidate)) {
leader = candidate;
ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local()));
Loggers.RAFT
.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()),
JacksonUtils.toJson(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return;
}
update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate);
}
/**
* Get local raft peer.
*
* @return local raft peer
*/
public RaftPeer local() {
RaftPeer peer = peers.get(EnvUtil.getLocalAddress());
if (peer == null && EnvUtil.getStandaloneMode()) {
RaftPeer localPeer = new RaftPeer();
localPeer.ip = NetUtils.localServer();
localPeer.term.set(localTerm.get());
peers.put(localPeer.ip, localPeer);
return localPeer;
}
if (peer == null) {
throw new IllegalStateException(
"unable to find local peer: " + NetUtils.localServer() + ", all peers: " + Arrays
.toString(peers.keySet().toArray()));
}
return peer;
}
public RaftPeer get(String server) {
return peers.get(server);
}
public int majorityCount() {
return peers.size() / 2 + 1;
}
/**
* Reset set.
*/
public void reset() {
leader = null;
for (RaftPeer peer : peers.values()) {
peer.voteFor = null;
}
}
public void setTerm(long term) {
localTerm.set(term);
}
public long getTerm() {
return localTerm.get();
}
public boolean contains(RaftPeer remote) {
return peers.containsKey(remote.ip);
}
@Override
public void onEvent(MembersChangeEvent event) {
Collection<Member> members = event.getMembers();
Collection<Member> newMembers = new HashSet<>(members);
newMembers.removeAll(oldMembers);
// If an IP change occurs, the change starts
if (!newMembers.isEmpty()) {
changePeers(members);
}
oldMembers.clear();
oldMembers.addAll(members);
}
protected void changePeers(Collection<Member> members) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(members.size());
for (Member member : members) {
final String address = member.getAddress();
if (peers.containsKey(address)) {
tmpPeers.put(address, peers.get(address));
continue;
}
RaftPeer raftPeer = new RaftPeer();
raftPeer.ip = address;
// first time meet the local server:
if (EnvUtil.getLocalAddress().equals(address)) {
raftPeer.term.set(localTerm.get());
}
tmpPeers.put(address, raftPeer);
}
// replace raft peer set:
peers = tmpPeers;
ready = true;
Loggers.RAFT.info("raft peers changed: " + members);
}
@Override
public String toString() {
return "RaftPeerSet{" + "localTerm=" + localTerm + ", leader=" + leader + ", peers=" + peers + ", sites="
+ sites + '}';
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import java.util.Map;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;
/**
* Raft http proxy.
*
* @deprecated will remove in 1.4.x
* @author nacos
*/
@Deprecated
@Component
public class RaftProxy {
/**
* Proxy get method.
*
* @param server target server
* @param api api path
* @param params parameters
* @throws Exception any exception during request
*/
public void proxyGet(String server, String api, Map<String, String> params) throws Exception {
// do proxy
if (!InternetAddressUtil.containsPort(server)) {
server = server + InternetAddressUtil.IP_PORT_SPLITER + EnvUtil.getPort();
}
String url = HTTP_PREFIX + server + EnvUtil.getContextPath() + api;
RestResult<String> result = HttpClient.httpGet(url, null, params);
if (!result.ok()) {
throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
}
}
/**
* Proxy specified method.
*
* @param server target server
* @param api api path
* @param params parameters
* @param method http method
* @throws Exception any exception during request
*/
public void proxy(String server, String api, Map<String, String> params, HttpMethod method) throws Exception {
// do proxy
if (!InternetAddressUtil.containsPort(server)) {
server = server + InternetAddressUtil.IP_PORT_SPLITER + EnvUtil.getPort();
}
String url = HTTP_PREFIX + server + EnvUtil.getContextPath() + api;
RestResult<String> result;
switch (method) {
case GET:
result = HttpClient.httpGet(url, null, params);
break;
case POST:
result = HttpClient.httpPost(url, null, params);
break;
case DELETE:
result = HttpClient.httpDelete(url, null, params);
break;
default:
throw new RuntimeException("unsupported method:" + method);
}
if (!result.ok()) {
throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
}
}
/**
* Proxy post method with large body.
*
* @param server target server
* @param api api path
* @param content body
* @param headers headers
* @throws Exception any exception during request
*/
public void proxyPostLarge(String server, String api, String content, Map<String, String> headers)
throws Exception {
// do proxy
if (!InternetAddressUtil.containsPort(server)) {
server = server + InternetAddressUtil.IP_PORT_SPLITER + EnvUtil.getPort();
}
String url = HTTP_PREFIX + server + EnvUtil.getContextPath() + api;
RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
if (!result.ok()) {
throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
}
}
}

View File

@ -1,352 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DATA_BASE_DIR;
/**
* Raft store.
*
* @author nacos
* @deprecated will remove in 1.4.x
*/
@Deprecated
@Component
public class RaftStore implements Closeable {
private final Properties meta = new Properties();
private static final String META_FILE_NAME = DATA_BASE_DIR + File.separator + "meta.properties";
private static final String CACHE_DIR = DATA_BASE_DIR + File.separator + "data";
/**
* Load datum from cache file.
*
* @param notifier raft notifier
* @param datums cached datum map
* @throws Exception any exception during load
*/
public synchronized void loadDatums(PersistentNotifier notifier, Map<String, Datum> datums) throws Exception {
Datum datum;
long start = System.currentTimeMillis();
for (File cache : listCaches()) {
if (cache.isDirectory() && cache.listFiles() != null) {
for (File datumFile : cache.listFiles()) {
datum = readDatum(datumFile, cache.getName());
if (datum != null) {
datums.put(datum.key, datum);
if (notifier != null) {
NotifyCenter.publishEvent(
ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
}
}
continue;
}
datum = readDatum(cache, StringUtils.EMPTY);
if (datum != null) {
datums.put(datum.key, datum);
}
}
Loggers.RAFT.info("finish loading all datums, size: {} cost {} ms.", datums.size(),
(System.currentTimeMillis() - start));
}
/**
* Load Metadata from cache file.
*
* @return metadata
* @throws Exception any exception during load
*/
public synchronized Properties loadMeta() throws Exception {
File metaFile = new File(META_FILE_NAME);
if (!metaFile.exists() && !metaFile.getParentFile().mkdirs() && !metaFile.createNewFile()) {
throw new IllegalStateException("failed to create meta file: " + metaFile.getAbsolutePath());
}
try (FileInputStream inStream = new FileInputStream(metaFile)) {
meta.load(inStream);
}
return meta;
}
/**
* Load datum from cache file by key.
*
* @param key datum key
* @return datum
* @throws Exception any exception during load
*/
public synchronized Datum load(String key) throws Exception {
long start = System.currentTimeMillis();
// load data
for (File cache : listCaches()) {
if (!cache.isFile()) {
Loggers.RAFT.warn("warning: encountered directory in cache dir: {}", cache.getAbsolutePath());
}
if (!StringUtils.equals(cache.getName(), encodeDatumKey(key))) {
continue;
}
Loggers.RAFT.info("finish loading datum, key: {} cost {} ms.", key, (System.currentTimeMillis() - start));
return readDatum(cache, StringUtils.EMPTY);
}
return null;
}
private synchronized Datum readDatum(File file, String namespaceId) throws IOException {
if (!KeyBuilder.isDatumCacheFile(file.getName())) {
return null;
}
ByteBuffer buffer;
try (FileChannel fc = new FileInputStream(file).getChannel()) {
buffer = ByteBuffer.allocate((int) file.length());
fc.read(buffer);
String json = new String(buffer.array(), StandardCharsets.UTF_8);
if (StringUtils.isBlank(json)) {
return null;
}
final String fileName = file.getName();
if (KeyBuilder.matchSwitchKey(fileName)) {
return JacksonUtils.toObj(json, new TypeReference<Datum<SwitchDomain>>() {
});
}
if (KeyBuilder.matchServiceMetaKey(fileName)) {
Datum<Service> serviceDatum;
try {
serviceDatum = JacksonUtils.toObj(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
});
} catch (Exception e) {
JsonNode jsonObject = JacksonUtils.toObj(json);
serviceDatum = new Datum<>();
serviceDatum.timestamp.set(jsonObject.get("timestamp").asLong());
serviceDatum.key = jsonObject.get("key").asText();
serviceDatum.value = JacksonUtils.toObj(jsonObject.get("value").toString(), Service.class);
}
if (StringUtils.isBlank(serviceDatum.value.getGroupName())) {
serviceDatum.value.setGroupName(Constants.DEFAULT_GROUP);
}
if (!serviceDatum.value.getName().contains(Constants.SERVICE_INFO_SPLITER)) {
serviceDatum.value.setName(
Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER + serviceDatum.value.getName());
}
return serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(fileName)) {
Datum<Instances> instancesDatum;
try {
instancesDatum = JacksonUtils.toObj(json, new TypeReference<Datum<Instances>>() {
});
} catch (Exception e) {
JsonNode jsonObject = JacksonUtils.toObj(json);
instancesDatum = new Datum<>();
instancesDatum.timestamp.set(jsonObject.get("timestamp").asLong());
String key = jsonObject.get("key").asText();
String serviceName = KeyBuilder.getServiceName(key);
key = key.substring(0, key.indexOf(serviceName)) + Constants.DEFAULT_GROUP
+ Constants.SERVICE_INFO_SPLITER + serviceName;
instancesDatum.key = key;
instancesDatum.value = new Instances();
instancesDatum.value.setInstanceList(
JacksonUtils.toObj(jsonObject.get("value").toString(), new TypeReference<List<Instance>>() {
}));
if (!instancesDatum.value.getInstanceList().isEmpty()) {
for (Instance instance : instancesDatum.value.getInstanceList()) {
instance.setEphemeral(false);
}
}
}
return instancesDatum;
}
return JacksonUtils.toObj(json, Datum.class);
} catch (Exception e) {
Loggers.RAFT.warn("waning: failed to deserialize key: {}", file.getName());
throw e;
}
}
private String cacheFileName(String namespaceId, String datumKey) {
String fileName;
if (StringUtils.isNotBlank(namespaceId)) {
fileName = CACHE_DIR + File.separator + namespaceId + File.separator + encodeDatumKey(datumKey);
} else {
fileName = CACHE_DIR + File.separator + encodeDatumKey(datumKey);
}
return fileName;
}
/**
* Write datum to cache file.
*
* @param datum datum
* @throws Exception any exception during writing
*/
public synchronized void write(final Datum datum) throws Exception {
String namespaceId = KeyBuilder.getNamespace(datum.key);
File cacheFile = new File(cacheFileName(namespaceId, datum.key));
if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
MetricsMonitor.getDiskException().increment();
throw new IllegalStateException("can not make cache file: " + cacheFile.getName());
}
ByteBuffer data;
data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));
try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {
fc.write(data, data.position());
fc.force(true);
} catch (Exception e) {
MetricsMonitor.getDiskException().increment();
throw e;
}
// remove old format file:
if (StringUtils.isNoneBlank(namespaceId)) {
if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {
String oldDatumKey = datum.key
.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);
cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));
if (cacheFile.exists() && !cacheFile.delete()) {
Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key,
datum.value);
throw new IllegalStateException("failed to delete old format datum: " + datum.key);
}
}
}
}
private File[] listCaches() throws Exception {
File cacheDir = new File(CACHE_DIR);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("cloud not make out directory: " + cacheDir.getName());
}
return cacheDir.listFiles();
}
/**
* Delete datum from cache file.
*
* @param datum datum
*/
public void delete(Datum datum) {
// datum key contains namespace info:
String namespaceId = KeyBuilder.getNamespace(datum.key);
if (StringUtils.isNotBlank(namespaceId)) {
File cacheFile = new File(cacheFileName(namespaceId, datum.key));
if (cacheFile.exists() && !cacheFile.delete()) {
Loggers.RAFT.error("[RAFT-DELETE] failed to delete datum: {}, value: {}", datum.key, datum.value);
throw new IllegalStateException("failed to delete datum: " + datum.key);
}
}
}
/**
* Update term Metadata.
*
* @param term term
* @throws Exception any exception during update
*/
public void updateTerm(long term) throws Exception {
File file = new File(META_FILE_NAME);
if (!file.exists() && !file.getParentFile().mkdirs() && !file.createNewFile()) {
throw new IllegalStateException("failed to create meta file");
}
try (FileOutputStream outStream = new FileOutputStream(file)) {
// write meta
meta.setProperty("term", String.valueOf(term));
meta.store(outStream, null);
}
}
private static String encodeDatumKey(String datumKey) {
return datumKey.replace(':', '#');
}
private static String decodeDatumKey(String datumKey) {
return datumKey.replace("#", ":");
}
@Override
public void shutdown() throws NacosException {
}
}

View File

@ -25,7 +25,6 @@ import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.v2.client.Client;
@ -73,20 +72,17 @@ public class OperatorController {
private final DistroMapper distroMapper;
private final RaftCore raftCore;
private final ClientManager clientManager;
public OperatorController(SwitchManager switchManager, ServerListManager serverListManager,
ServerMemberManager memberManager, ServerStatusManager serverStatusManager, SwitchDomain switchDomain,
DistroMapper distroMapper, RaftCore raftCore, ClientManager clientManager) {
DistroMapper distroMapper, ClientManager clientManager) {
this.switchManager = switchManager;
this.serverListManager = serverListManager;
this.memberManager = memberManager;
this.serverStatusManager = serverStatusManager;
this.switchDomain = switchDomain;
this.distroMapper = distroMapper;
this.raftCore = raftCore;
this.clientManager = clientManager;
}
@ -198,7 +194,6 @@ public class OperatorController {
result.put("serviceCount", MetricsMonitor.getDomCountMonitor().get());
result.put("instanceCount", MetricsMonitor.getIpCountMonitor().get());
result.put("subscribeCount", MetricsMonitor.getSubscriberCount().get());
result.put("raftNotifyTaskCount", raftCore.getNotifyTaskCount());
result.put("responsibleInstanceCount", responsibleIpCount);
result.put("clientCount", allClientId.size());
result.put("connectionBasedClientCount", connectionBasedClient);

View File

@ -1,379 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Methods for Raft consistency protocol. These methods should only be invoked by Nacos server itself.
*
* @author nkorange
* @since 1.0.0
* @deprecated will remove in 1.4.x
*/
@Deprecated
@RestController
@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_RAFT_CONTEXT,
UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ UtilsAndCommons.NACOS_NAMING_RAFT_CONTEXT})
public class RaftController {
private final RaftConsistencyServiceImpl raftConsistencyService;
private final RaftCore raftCore;
private final ClusterVersionJudgement versionJudgement;
public RaftController(RaftConsistencyServiceImpl raftConsistencyService, RaftCore raftCore,
ClusterVersionJudgement versionJudgement) {
this.raftConsistencyService = raftConsistencyService;
this.raftCore = raftCore;
this.versionJudgement = versionJudgement;
}
/**
* Raft vote api.
*
* @param request http request
* @param response http response
* @return peer information
* @throws Exception exception
*/
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
return JacksonUtils.transferToJsonNode(peer);
}
/**
* Beat api.
*
* @param request http request
* @param response http response
* @return peer information
* @throws Exception exception
*/
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
return JacksonUtils.transferToJsonNode(peer);
}
/**
* Get peer information.
*
* @param request http request
* @param response http response
* @return peer information
*/
@GetMapping("/peer")
public JsonNode getPeer(HttpServletRequest request, HttpServletResponse response) {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
List<RaftPeer> peers = raftCore.getPeers();
RaftPeer peer = null;
for (RaftPeer peer1 : peers) {
if (StringUtils.equals(peer1.ip, NetUtils.localServer())) {
peer = peer1;
}
}
if (peer == null) {
peer = new RaftPeer();
peer.ip = NetUtils.localServer();
}
return JacksonUtils.transferToJsonNode(peer);
}
/**
* Datum reload request.
*
* @param request http request
* @param response http response
* @return 'ok' if success
* @throws Exception exception
*/
@PutMapping("/datum/reload")
public String reloadDatum(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String key = WebUtils.required(request, "key");
raftCore.loadDatum(key);
return "ok";
}
/**
* Publish datum.
*
* @param request http request
* @param response http response
* @return 'ok' if success
* @throws Exception exception
*/
@PostMapping("/datum")
public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
String key = json.get("key").asText();
if (KeyBuilder.matchInstanceListKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Instances.class));
return "ok";
}
if (KeyBuilder.matchSwitchKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), SwitchDomain.class));
return "ok";
}
if (KeyBuilder.matchServiceMetaKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Service.class));
return "ok";
}
throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}
/**
* Remove datum.
*
* @param request http request
* @param response http response
* @return 'ok' if success
* @throws Exception exception
*/
@DeleteMapping("/datum")
public String delete(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
raftConsistencyService.remove(WebUtils.required(request, "key"));
return "ok";
}
/**
* Get datum.
*
* @param request http request
* @param response http response
* @return datum
* @throws Exception exception
*/
@GetMapping("/datum")
public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String keysString = WebUtils.required(request, "keys");
keysString = URLDecoder.decode(keysString, "UTF-8");
String[] keys = keysString.split(",");
List<Datum> datums = new ArrayList<Datum>();
for (String key : keys) {
Datum datum = raftCore.getDatum(key);
datums.add(datum);
}
return JacksonUtils.toJson(datums);
}
/**
* Commit publish datum.
*
* @param request http request
* @param response http response
* @return 'ok' if success
* @throws Exception exception
*/
@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode jsonObject = JacksonUtils.toObj(value);
String key = "key";
RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
JsonNode datumJson = jsonObject.get("datum");
Datum datum = null;
if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
});
} else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
});
} else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
});
}
raftConsistencyService.onPut(datum, source);
return "ok";
}
/**
* Commit delete datum.
*
* @param request http request
* @param response http response
* @return 'ok' if success
* @throws Exception exception
*/
@DeleteMapping("/datum/commit")
public String onDelete(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JsonNode jsonObject = JacksonUtils.toObj(value);
Datum datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), Datum.class);
RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
raftConsistencyService.onRemove(datum, source);
return "ok";
}
/**
* Elect leader api.
*
* @param request http request
* @param response http response
* @return leader peer information
*/
@GetMapping("/leader")
public JsonNode getLeader(HttpServletRequest request, HttpServletResponse response) {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put("leader", JacksonUtils.toJson(raftCore.getLeader()));
return result;
}
/**
* Get all listeners.
*
* @param request http request
* @param response http response
* @return all listener information
*/
@GetMapping("/listeners")
public JsonNode getAllListeners(HttpServletRequest request, HttpServletResponse response) {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Map<String, ConcurrentHashSet<RecordListener>> listeners = raftCore.getListeners();
ArrayNode listenerArray = JacksonUtils.createEmptyArrayNode();
for (String key : listeners.keySet()) {
listenerArray.add(key);
}
result.replace("listeners", listenerArray);
return result;
}
public static String getAcceptEncoding(HttpServletRequest req) {
String encode = StringUtils.defaultIfEmpty(req.getHeader("Accept-Charset"), "UTF-8");
encode = encode.contains(",") ? encode.substring(0, encode.indexOf(",")) : encode;
return encode.contains(";") ? encode.substring(0, encode.indexOf(";")) : encode;
}
}

View File

@ -22,14 +22,13 @@ import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.v2.cleaner.EmptyServiceAutoCleaner;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
@ -44,7 +43,6 @@ import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -98,8 +96,6 @@ public class ServiceManager implements RecordListener<Service> {
private final UdpPushService pushService;
private final RaftPeerSet raftPeerSet;
@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;
@ -110,12 +106,11 @@ public class ServiceManager implements RecordListener<Service> {
private int cleanEmptyServicePeriod;
public ServiceManager(SwitchDomain switchDomain, DistroMapper distroMapper, ServerMemberManager memberManager,
UdpPushService pushService, RaftPeerSet raftPeerSet) {
UdpPushService pushService) {
this.switchDomain = switchDomain;
this.distroMapper = distroMapper;
this.memberManager = memberManager;
this.pushService = pushService;
this.raftPeerSet = raftPeerSet;
}
/**
@ -163,7 +158,8 @@ public class ServiceManager implements RecordListener<Service> {
* @param serverIP target server ip
* @param checksum checksum of service
*/
public synchronized void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {
public synchronized void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP,
String checksum) {
try {
toBeUpdatedServicesQueue
.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
@ -283,10 +279,6 @@ public class ServiceManager implements RecordListener<Service> {
}
}
public RaftPeer getMySelfClusterState() {
return raftPeerSet.local();
}
/**
* Update health status of instance in service.
*
@ -450,7 +442,7 @@ public class ServiceManager implements RecordListener<Service> {
if (service != null) {
return;
}
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
@ -464,7 +456,7 @@ public class ServiceManager implements RecordListener<Service> {
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
@ -951,8 +943,9 @@ public class ServiceManager implements RecordListener<Service> {
List<Instance> instances = service.allIPs();
for (Instance instance : instances) {
if (InternetAddressUtil.containsPort(containedInstance)) {
if (StringUtils.equals(instance.getIp() + InternetAddressUtil.IP_PORT_SPLITER + instance.getPort(),
containedInstance)) {
if (StringUtils
.equals(instance.getIp() + InternetAddressUtil.IP_PORT_SPLITER + instance.getPort(),
containedInstance)) {
contained = true;
break;
}

View File

@ -16,7 +16,6 @@
package com.alibaba.nacos.naming.core.v2.upgrade;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
@ -27,9 +26,6 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.RefreshStorageDataTask;
@ -67,12 +63,6 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
private final AtomicBoolean all20XVersion = new AtomicBoolean(false);
private final RaftPeerSet raftPeerSet;
private final RaftCore raftCore;
private final ClusterVersionJudgement versionJudgement;
private final ServerMemberManager memberManager;
private final ServiceManager serviceManager;
@ -87,13 +77,8 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
private static final int MINOR_VERSION = 4;
public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersionJudgement versionJudgement,
ServerMemberManager memberManager, ServiceManager serviceManager,
UpgradeStates upgradeStates,
DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine) {
this.raftPeerSet = raftPeerSet;
this.raftCore = raftCore;
this.versionJudgement = versionJudgement;
public UpgradeJudgement(ServerMemberManager memberManager, ServiceManager serviceManager,
UpgradeStates upgradeStates, DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine) {
this.memberManager = memberManager;
this.serviceManager = serviceManager;
this.doubleWriteDelayTaskEngine = doubleWriteDelayTaskEngine;
@ -113,7 +98,8 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
}
private void initUpgradeChecker() {
selfUpgradeChecker = SelfUpgradeCheckerSpiHolder.findSelfChecker(EnvUtil.getProperty("upgrading.checker.type", "default"));
selfUpgradeChecker = SelfUpgradeCheckerSpiHolder
.findSelfChecker(EnvUtil.getProperty("upgrading.checker.type", "default"));
upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker"));
upgradeChecker.scheduleAtFixedRate(() -> {
if (isUseGrpcFeatures()) {
@ -152,9 +138,9 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
@Override
public void onEvent(MembersChangeEvent event) {
if (!event.hasTriggers()) {
Loggers.SRV_LOG.info("Member change without no trigger. "
+ "It may be triggered by member lookup on startup. "
+ "Skip.");
Loggers.SRV_LOG
.info("Member change without no trigger. " + "It may be triggered by member lookup on startup. "
+ "Skip.");
return;
}
Loggers.SRV_LOG.info("member change, event: {}", event);
@ -182,13 +168,6 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
if (isDowngradeGrpc && isDowngradeJraft && !jraftFeature) {
Loggers.SRV_LOG.info("Downgrade to 1.X");
NotifyCenter.publishEvent(new UpgradeStates.UpgradeStateChangedEvent(false));
try {
raftPeerSet.init();
raftCore.init();
versionJudgement.reset();
} catch (Exception e) {
Loggers.SRV_LOG.error("Downgrade rafe failed ", e);
}
}
}
@ -243,25 +222,4 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {
}
NotifyCenter.deregisterSubscriber(this);
}
/**
* Stop judgement and clear all cache.
*/
public void stopAll() {
try {
Loggers.SRV_LOG.info("Disable Double write, stop and clean v1.x cache and features");
useGrpcFeatures.set(true);
NotifyCenter.publishEvent(new UpgradeStates.UpgradeStateChangedEvent(true));
useJraftFeatures.set(true);
NotifyCenter.deregisterSubscriber(this);
doubleWriteDelayTaskEngine.shutdown();
if (null != upgradeChecker) {
upgradeChecker.shutdownNow();
}
serviceManager.shutdown();
raftCore.shutdown();
} catch (NacosException e) {
Loggers.SRV_LOG.info("Close double write with exception", e);
}
}
}

View File

@ -155,10 +155,6 @@ public class GlobalExecutor {
return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
public static void registerServerInfoUpdater(Runnable runnable) {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
}
public static void registerServerStatusReporter(Runnable runnable, long delay) {
SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}

View File

@ -20,13 +20,9 @@ import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecord;
import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecordsHolder;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientDataProcessor;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -43,12 +39,6 @@ import java.util.concurrent.TimeUnit;
@Component
public class PerformanceLoggerThread {
@Autowired
private RaftCore raftCore;
@Autowired
private ClusterVersionJudgement versionJudgement;
private static final long PERIOD = 60;
@PostConstruct
@ -76,23 +66,6 @@ public class PerformanceLoggerThread {
public void collectMetrics() {
MetricsMonitor.getDomCountMonitor().set(com.alibaba.nacos.naming.core.v2.ServiceManager.getInstance().size());
MetricsMonitor.getAvgPushCostMonitor().set(getAvgPushCost());
metricsRaftLeader();
}
/**
* Will deprecated after v1.4.x
*/
@Deprecated
private void metricsRaftLeader() {
if (!versionJudgement.allMemberIsNewVersion()) {
if (raftCore.isLeader()) {
MetricsMonitor.getLeaderStatusMonitor().set(1);
} else if (raftCore.getPeerSet().local().state == RaftPeer.State.FOLLOWER) {
MetricsMonitor.getLeaderStatusMonitor().set(0);
} else {
MetricsMonitor.getLeaderStatusMonitor().set(2);
}
}
}
class PerformanceLogTask implements Runnable {

View File

@ -16,14 +16,10 @@
package com.alibaba.nacos.naming;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessorDelegate;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil;
@ -33,7 +29,6 @@ import org.junit.Rule;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
@ -63,12 +58,6 @@ public abstract class BaseTest {
@Mock
public ServiceManager serviceManager;
@Mock
public RaftPeerSet peerSet;
@Mock
public RaftCore raftCore;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -99,15 +88,6 @@ public abstract class BaseTest {
ApplicationUtils.injectContext(context);
}
protected void mockRaft() {
RaftPeer peer = new RaftPeer();
peer.ip = NetUtils.localServer();
raftCore.setPeerSet(peerSet);
Mockito.when(peerSet.local()).thenReturn(peer);
Mockito.when(peerSet.getLeader()).thenReturn(peer);
Mockito.when(peerSet.isLeader(NetUtils.localServer())).thenReturn(true);
}
protected void mockInjectPushServer() {
doReturn(pushService).when(context).getBean(UdpPushService.class);
}

View File

@ -1,121 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyServiceDelegateImpl;
import com.alibaba.nacos.naming.pojo.Record;
import junit.framework.TestCase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class DelegateConsistencyServiceImplTest extends TestCase {
private DelegateConsistencyServiceImpl delegateConsistencyService;
@Mock
private PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";
public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";
private final String ephemeralPrefix = INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX;
@Mock
private Record record;
private String ephemeralKey;
private String peristentKey;
public DelegateConsistencyServiceImplTest() {
}
@Before
public void setUp() {
delegateConsistencyService = new DelegateConsistencyServiceImpl(persistentConsistencyService);
ephemeralKey = ephemeralPrefix + "test-key";
peristentKey = "persistent-test-key";
}
@Test
public void testPut() throws NacosException {
delegateConsistencyService.put(ephemeralKey, record);
verify(persistentConsistencyService, never()).put(ephemeralKey, record);
delegateConsistencyService.put(peristentKey, record);
verify(persistentConsistencyService).put(peristentKey, record);
}
@Test
public void testRemove() throws NacosException {
delegateConsistencyService.remove(ephemeralKey);
verify(persistentConsistencyService, never()).remove(ephemeralKey);
delegateConsistencyService.remove(peristentKey);
verify(persistentConsistencyService).remove(peristentKey);
}
@Test
public void testGet() throws NacosException {
delegateConsistencyService.get(ephemeralKey);
verify(persistentConsistencyService, never()).get(ephemeralKey);
delegateConsistencyService.get(peristentKey);
verify(persistentConsistencyService).get(peristentKey);
}
@Test
public void testListen() throws NacosException {
delegateConsistencyService.listen(ephemeralKey, null);
verify(persistentConsistencyService, never()).listen(ephemeralKey, null);
delegateConsistencyService.listen(peristentKey, null);
verify(persistentConsistencyService).listen(peristentKey, null);
}
@Test
public void testUnListen() throws NacosException {
delegateConsistencyService.unListen(ephemeralKey, null);
verify(persistentConsistencyService, never()).unListen(ephemeralKey, null);
delegateConsistencyService.unListen(peristentKey, null);
verify(persistentConsistencyService).unListen(peristentKey, null);
}
@Test
public void testIsAvailable() {
delegateConsistencyService.isAvailable();
verify(persistentConsistencyService, never()).isAvailable();
}
@Test
public void testGetErrorMsg() {
int ephemeralCalledTimes = 3;
delegateConsistencyService.getErrorMsg();
verify(persistentConsistencyService).getErrorMsg();
}
}

View File

@ -1,151 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.mock.web.MockServletContext;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusterVersionJudgementTest {
private ServerMemberManager manager;
private String[] ipList;
private final int ipCount = 3;
private final String ip1 = "1.1.1.1";
private final String ip2 = "2.2.2.2";
private final String ip3 = "3.3.3.3";
private final int defalutPort = 80;
private final String newVersion = "1.4.0";
private final String oldVersion = "1.3.0";
private List<Member> members;
private Map<String, Object> newVersionMeta;
private Map<String, Object> oldVersionMeta;
private ClusterVersionJudgement judgement;
public ClusterVersionJudgementTest() {
}
@BeforeClass
public static void beforeClass() {
EnvUtil.setEnvironment(new MockEnvironment());
}
@Before
public void beforeMethod() throws Exception {
manager = new ServerMemberManager(new MockServletContext());
newVersionMeta = new HashMap<>(4);
newVersionMeta.put(MemberMetaDataConstants.VERSION, newVersion);
oldVersionMeta = new HashMap<>(4);
oldVersionMeta.put(MemberMetaDataConstants.VERSION, oldVersion);
ipList = new String[ipCount];
ipList[0] = ip1;
ipList[1] = ip2;
ipList[2] = ip3;
members = new LinkedList<>();
members.add(Member.builder().ip(ipList[0]).port(defalutPort).state(NodeState.UP).build());
members.add(Member.builder().ip(ipList[1]).port(defalutPort).state(NodeState.UP).build());
members.add(Member.builder().ip(ipList[2]).port(defalutPort).state(NodeState.UP).build());
manager.memberJoin(members);
}
@After
public void afterMethod() throws Exception {
manager.shutdown();
manager = null;
}
/**
* The member node has version information greater than 1.4.0
*/
@Test
public void testAllMemberIsNewVersion() {
Collection<Member> allMembers = manager.allMembers();
allMembers.forEach(member -> member.setExtendInfo(newVersionMeta));
judgement = new ClusterVersionJudgement(manager);
judgement.judge();
Assert.assertTrue(judgement.allMemberIsNewVersion());
}
@Test
public void testPartMemberIsNewVersion() {
Collection<Member> allMembers = manager.allMembers();
AtomicInteger count = new AtomicInteger();
allMembers.forEach(member -> {
if (count.get() == 0) {
member.setExtendInfo(oldVersionMeta);
} else {
count.incrementAndGet();
member.setExtendInfo(newVersionMeta);
}
});
judgement = new ClusterVersionJudgement(manager);
judgement.judge();
Assert.assertFalse(judgement.allMemberIsNewVersion());
}
@Test
public void testPartMemberUpdateToNewVersion() {
// Firstly, make a cluster with a part of new version servers.
Collection<Member> allMembers = manager.allMembers();
AtomicInteger count = new AtomicInteger();
allMembers.forEach(member -> {
if (count.get() == 0) {
member.setExtendInfo(oldVersionMeta);
} else {
count.incrementAndGet();
member.setExtendInfo(newVersionMeta);
}
});
judgement = new ClusterVersionJudgement(manager);
judgement.judge();
Assert.assertFalse(judgement.allMemberIsNewVersion());
// Secondly, make all in the cluster to be new version servers.
allMembers.forEach(member -> member.setExtendInfo(newVersionMeta));
judgement = new ClusterVersionJudgement(manager);
judgement.judge();
Assert.assertTrue(judgement.allMemberIsNewVersion());
}
}

View File

@ -1,152 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.env.MockEnvironment;
import java.lang.reflect.Field;
@RunWith(MockitoJUnitRunner.class)
public class PersistentConsistencyServiceDelegateImplTest {
@Mock
private ClusterVersionJudgement clusterVersionJudgement;
@Mock
private RaftConsistencyServiceImpl raftConsistencyService;
@Mock
private ProtocolManager protocolManager;
@Mock
private Record record;
@Mock
private RecordListener recordListener;
@Mock
private BasePersistentServiceProcessor basePersistentServiceProcessor;
private PersistentConsistencyServiceDelegateImpl oldPersistentConsistencyServiceDelegate;
private PersistentConsistencyServiceDelegateImpl newPersistentConsistencyServiceDelegate;
@Before
public void setUp() throws Exception {
MockEnvironment environment = new MockEnvironment();
environment.setProperty(Constants.SUPPORT_UPGRADE_FROM_1X, "true");
EnvUtil.setEnvironment(environment);
EnvUtil.setIsStandalone(true);
oldPersistentConsistencyServiceDelegate = new PersistentConsistencyServiceDelegateImpl(clusterVersionJudgement,
raftConsistencyService, protocolManager);
newPersistentConsistencyServiceDelegate = new PersistentConsistencyServiceDelegateImpl(clusterVersionJudgement,
raftConsistencyService, protocolManager);
Class<PersistentConsistencyServiceDelegateImpl> persistentConsistencyServiceDelegateClass = PersistentConsistencyServiceDelegateImpl.class;
Field switchNewPersistentService = persistentConsistencyServiceDelegateClass
.getDeclaredField("switchNewPersistentService");
switchNewPersistentService.setAccessible(true);
switchNewPersistentService.set(newPersistentConsistencyServiceDelegate, true);
Field newPersistentConsistencyService = persistentConsistencyServiceDelegateClass
.getDeclaredField("newPersistentConsistencyService");
newPersistentConsistencyService.setAccessible(true);
newPersistentConsistencyService.set(newPersistentConsistencyServiceDelegate, basePersistentServiceProcessor);
}
@Test()
public void testPut() throws Exception {
String key = "record_key";
oldPersistentConsistencyServiceDelegate.put(key, record);
Mockito.verify(raftConsistencyService).put(key, record);
newPersistentConsistencyServiceDelegate.put(key, record);
Mockito.verify(basePersistentServiceProcessor).put(key, record);
}
@Test
public void testRemove() throws NacosException {
String key = "record_key";
oldPersistentConsistencyServiceDelegate.remove(key);
Mockito.verify(raftConsistencyService).remove(key);
newPersistentConsistencyServiceDelegate.remove(key);
Mockito.verify(basePersistentServiceProcessor).remove(key);
}
@Test()
public void testGet() throws NacosException {
String key = "record_key";
oldPersistentConsistencyServiceDelegate.get(key);
Mockito.verify(raftConsistencyService).get(key);
newPersistentConsistencyServiceDelegate.get(key);
Mockito.verify(basePersistentServiceProcessor).get(key);
}
@Test
public void testListen() throws NacosException {
String key = "listen_key";
oldPersistentConsistencyServiceDelegate.listen(key, recordListener);
Mockito.verify(raftConsistencyService).listen(key, recordListener);
newPersistentConsistencyServiceDelegate.listen(key, recordListener);
Mockito.verify(basePersistentServiceProcessor).listen(key, recordListener);
}
@Test
public void testUnListen() throws NacosException {
String key = "listen_key";
oldPersistentConsistencyServiceDelegate.unListen(key, recordListener);
Mockito.verify(raftConsistencyService).unListen(key, recordListener);
newPersistentConsistencyServiceDelegate.unListen(key, recordListener);
Mockito.verify(basePersistentServiceProcessor).unListen(key, recordListener);
}
@Test
public void testIsAvailable() {
oldPersistentConsistencyServiceDelegate.isAvailable();
Mockito.verify(raftConsistencyService).isAvailable();
newPersistentConsistencyServiceDelegate.isAvailable();
Mockito.verify(basePersistentServiceProcessor).isAvailable();
}
@Test
public void testGetErrorMsg() {
oldPersistentConsistencyServiceDelegate.getErrorMsg();
Mockito.verify(raftConsistencyService).getErrorMsg();
newPersistentConsistencyServiceDelegate.getErrorMsg();
Mockito.verify(basePersistentServiceProcessor).getErrorMsg();
}
}

View File

@ -1,117 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.mock.web.MockServletContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
public class RaftPeerSetTest {
@BeforeClass
public static void beforeClass() {
ApplicationUtils.injectContext(new StaticApplicationContext());
EnvUtil.setEnvironment(new MockEnvironment());
}
private ServerMemberManager memberManager;
@Before
public void before() throws Exception {
memberManager = new ServerMemberManager(new MockServletContext());
}
@Test
public void testRaftPeerChange() {
final AtomicBoolean notifyReceived = new AtomicBoolean(false);
RaftPeerSetCopy peerSetCopy = new RaftPeerSetCopy(memberManager, () -> {
notifyReceived.set(true);
});
Collection<Member> firstEvent = Arrays.asList(
Member.builder().ip("127.0.0.1").port(80).build(),
Member.builder().ip("127.0.0.2").port(81).build(),
Member.builder().ip("127.0.0.3").port(82).build());
peerSetCopy.changePeers(firstEvent);
Assert.assertTrue(notifyReceived.get());
notifyReceived.set(false);
Collection<Member> secondEvent = Arrays.asList(
Member.builder().ip("127.0.0.1").port(80).build(),
Member.builder().ip("127.0.0.2").port(81).build(),
Member.builder().ip("127.0.0.3").port(82).build(),
Member.builder().ip("127.0.0.4").port(83).build());
peerSetCopy.changePeers(secondEvent);
Assert.assertTrue(notifyReceived.get());
notifyReceived.set(false);
Collection<Member> thirdEvent = Arrays.asList(
Member.builder().ip("127.0.0.1").port(80).build(),
Member.builder().ip("127.0.0.2").port(81).build(),
Member.builder().ip("127.0.0.5").port(82).build());
peerSetCopy.changePeers(thirdEvent);
Assert.assertTrue(notifyReceived.get());
notifyReceived.set(false);
Collection<Member> fourEvent = Arrays.asList(
Member.builder().ip("127.0.0.1").port(80).build(),
Member.builder().ip("127.0.0.2").port(81).build());
peerSetCopy.changePeers(fourEvent);
Assert.assertTrue(notifyReceived.get());
notifyReceived.set(false);
Collection<Member> fiveEvent = Arrays.asList(
Member.builder().ip("127.0.0.1").port(80).build(),
Member.builder().ip("127.0.0.3").port(81).build());
peerSetCopy.changePeers(fiveEvent);
Assert.assertTrue(notifyReceived.get());
notifyReceived.set(false);
}
private static class RaftPeerSetCopy extends RaftPeerSet {
private final Runnable runnable;
public RaftPeerSetCopy(ServerMemberManager memberManager, Runnable callback) {
super(memberManager);
this.runnable = callback;
}
@Override
protected void changePeers(Collection<Member> members) {
this.runnable.run();
}
}
}

View File

@ -20,7 +20,6 @@ import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
@ -67,9 +66,6 @@ public class InstanceControllerTest extends BaseTest {
@Mock
private InstanceUpgradeHelper instanceUpgradeHelper;
@Mock
private RaftPeerSet peerSet;
private MockMvc mockmvc;
@Before

View File

@ -17,16 +17,12 @@
package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.SwitchManager;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.sys.env.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
@ -60,9 +56,6 @@ public class OperatorControllerTest {
@Mock
private SwitchDomain switchDomain;
@Mock
private SwitchManager switchManager;
@Mock
private ServerStatusManager serverStatusManager;
@ -72,9 +65,6 @@ public class OperatorControllerTest {
@Mock
private ClientManager clientManager;
@Mock
private RaftCore raftCore;
@Mock
private DistroMapper distroMapper;
@ -125,7 +115,6 @@ public class OperatorControllerTest {
Mockito.when(serverStatusManager.getServerStatus()).thenReturn(ServerStatus.UP);
Mockito.when(serviceManager.getResponsibleServiceCount()).thenReturn(1);
Mockito.when(serviceManager.getResponsibleInstanceCount()).thenReturn(1);
Mockito.when(raftCore.getNotifyTaskCount()).thenReturn(1);
Collection<String> clients = new HashSet<>();
clients.add("1628132208793_127.0.0.1_8080");
clients.add("127.0.0.1:8081#true");
@ -147,21 +136,6 @@ public class OperatorControllerTest {
Assert.assertEquals(3, objectNode.get("responsibleClientCount").asInt());
}
@Test
public void testGetResponsibleServer4Service() {
try {
Mockito.when(serviceManager.getService(Mockito.anyString(), Mockito.anyString())).thenReturn(new Service());
Mockito.when(distroMapper.mapSrv(Mockito.anyString())).thenReturn("test");
ObjectNode objectNode = operatorController.getResponsibleServer4Service("test", "test");
Assert.assertEquals("test", objectNode.get("responsibleServer").asText());
} catch (NacosException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
@Test
public void testGetResponsibleServer4Client() {
Mockito.when(distroMapper.mapSrv(Mockito.anyString())).thenReturn("test");

View File

@ -85,7 +85,7 @@ public class ServiceManagerTest extends BaseTest {
@Before
public void before() {
super.before();
serviceManager = new ServiceManager(switchDomain, distroMapper, serverMemberManager, pushService, peerSet);
serviceManager = new ServiceManager(switchDomain, distroMapper, serverMemberManager, pushService);
ReflectionTestUtils.setField(serviceManager, "consistencyService", consistencyService);
ReflectionTestUtils.setField(serviceManager, "synchronizer", synchronizer);
mockInjectSwitchDomain();

View File

@ -1,33 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
public class MockSelfUpgradeChecker implements SelfUpgradeChecker {
@Override
public String checkType() {
return "mock";
}
@Override
public boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine) {
return true;
}
}

View File

@ -1,351 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.sys.env.Constants;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.mock.env.MockEnvironment;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class UpgradeJudgementTest {
private final long sleepForCheck = 800L;
@Mock
private ConfigurableApplicationContext context;
@Mock
private RaftPeerSet raftPeerSet;
@Mock
private RaftCore raftCore;
@Mock
private ClusterVersionJudgement versionJudgement;
@Mock
private ServerMemberManager memberManager;
@Mock
private ServiceManager serviceManager;
@Mock
private DoubleWriteDelayTaskEngine doubleWriteDelayTaskEngine;
@Mock
private UpgradeStates upgradeStates;
@Mock
private ServiceStorage serviceStorage;
private UpgradeJudgement upgradeJudgement;
@Before
public void setUp() throws Exception {
MockEnvironment environment = new MockEnvironment();
environment.setProperty(Constants.SUPPORT_UPGRADE_FROM_1X, "true");
EnvUtil.setEnvironment(environment);
EnvUtil.setIsStandalone(false);
when(context.getBean(ServiceManager.class)).thenReturn(serviceManager);
when(context.getBean(ServiceStorage.class)).thenReturn(serviceStorage);
ApplicationUtils.injectContext(context);
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
NotifyCenter.deregisterSubscriber(upgradeJudgement);
MetricsMonitor.getIpCountMonitor().set(0);
MetricsMonitor.getDomCountMonitor().set(0);
}
@After
public void tearDown() {
upgradeJudgement.shutdown();
EnvUtil.setEnvironment(null);
}
@Test
public void testUpgradeOneNode() throws Exception {
Collection<Member> members = mockMember("1.3.2", "1.3.2", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeOneFor14XNode() throws Exception {
Collection<Member> members = mockMember("1.4.0", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeTwoNode() throws Exception {
Collection<Member> members = mockMember("", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckSucc() throws Exception {
Collection<Member> members = mockMember("2.0.0-snapshot", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true);
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertTrue(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckSelfFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(false);
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testAlreadyUpgradedAndCheckSelfFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertTrue(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradeCheckOthersFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true);
members.iterator().next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testAlreadyUpgradedAndCheckOthersFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
members.iterator().next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
upgradeJudgement.shutdown();
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
assertTrue(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeOneFor14XNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("1.4.0", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testAlreadyUpgradedAndDowngradeOneFor14XNode() throws Exception {
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.4.0", "2.0.0", "2.0.0")).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeTwoNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("", "", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testAlreadyUpgradedAndDowngradeTwoNode() throws Exception {
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("", "", "2.0.0")).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testDowngradeOneNode() throws Exception {
upgradeJudgement.setUseGrpcFeatures(true);
upgradeJudgement.setUseJraftFeatures(true);
Collection<Member> members = mockMember("1.3.2", "2.0.0", "2.0.0");
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testAlreadyUpgradedAndDowngradeOneNode() throws Exception {
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.3.2", "2.0.0", "2.0.0")).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertFalse(upgradeJudgement.isUseJraftFeatures());
}
@Test
public void testUpgradedBySpecifiedSelfUpgradeChecker() throws InterruptedException {
upgradeJudgement.shutdown();
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty("upgrading.checker.type", "mock");
mockEnvironment.setProperty(Constants.SUPPORT_UPGRADE_FROM_1X, "true");
EnvUtil.setEnvironment(mockEnvironment);
mockMember("1.3.2", "2.0.0", "2.0.0");
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertTrue((Boolean) memberManager.getSelf().getExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE));
}
private Collection<Member> mockMember(String... versions) {
Collection<Member> result = new HashSet<>();
for (int i = 0; i < versions.length; i++) {
Member member = new Member();
member.setPort(i);
if (StringUtils.isNotBlank(versions[i])) {
member.setExtendVal(MemberMetaDataConstants.VERSION, versions[i]);
}
result.add(member);
}
when(memberManager.getSelf()).thenReturn(result.iterator().next());
when(memberManager.allMembers()).thenReturn(result);
return result;
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.upgrade;
import org.junit.Assert;
import org.junit.Test;
/**
* {@link UpgradeStates} unit tests.
*
* @author chenglu
* @date 2021-09-01 22:04
*/
public class UpgradeStatesTest {
private UpgradeStates upgradeStates = new UpgradeStates();
@Test
public void testOnEvent() {
UpgradeStates.UpgradeStateChangedEvent changedEvent = new UpgradeStates.UpgradeStateChangedEvent(true);
upgradeStates.onEvent(changedEvent);
Assert.assertTrue(upgradeStates.isUpgraded());
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.raft;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftStore;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.sys.env.Constants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
public class RaftStoreTest extends BaseTest {
public RaftCore raftCore;
public RaftStore raftStore;
@Mock
private ClusterVersionJudgement versionJudgement;
@Before
public void setUp() {
super.before();
environment.setProperty(Constants.SUPPORT_UPGRADE_FROM_1X, "true");
raftStore = new RaftStore();
raftCore = new RaftCore(peerSet, switchDomain, null, null, raftStore, versionJudgement, null);
}
@After
public void tearDown() throws NacosException {
raftCore.shutdown();
raftStore.shutdown();
}
@Test
public void wrietDatum() throws Exception {
Datum<Instances> datum = new Datum<>();
String key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, false);
datum.key = key;
datum.timestamp.getAndIncrement();
datum.value = new Instances();
Instance instance = new Instance("1.1.1.1", 1, TEST_CLUSTER_NAME);
datum.value.getInstanceList().add(instance);
instance = new Instance("2.2.2.2", 2, TEST_CLUSTER_NAME);
datum.value.getInstanceList().add(instance);
raftStore.write(datum);
raftCore.init();
Datum result = raftCore.getDatum(key);
Assert.assertEquals(key, result.key);
Assert.assertEquals(1, result.timestamp.intValue());
Assert.assertEquals(datum.value.toString(), result.value.toString());
}
}