From 52ea9674be5882d830e58d32e1e16018343176a7 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Thu, 14 May 2020 10:15:35 +0800 Subject: [PATCH] fix: cluster startup mode adjustment --- .../core/controller/CoreOpsController.java | 28 ++++- .../core/distributed/ProtocolManager.java | 5 +- .../distributed/id/IdGeneratorManager.java | 3 + .../raft/JRaftMaintainService.java | 7 +- .../core/distributed/raft/JRaftProtocol.java | 30 ++--- .../core/distributed/raft/JRaftServer.java | 117 ++++++++++-------- .../raft/utils/FailoverClosureImpl.java | 37 +----- .../raft/utils/JRaftConstants.java | 4 + .../core/distributed/raft/utils/JRaftOps.java | 40 ++++++ 9 files changed, 156 insertions(+), 115 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/controller/CoreOpsController.java b/core/src/main/java/com/alibaba/nacos/core/controller/CoreOpsController.java index 66b9ff7de..cb2dbb430 100644 --- a/core/src/main/java/com/alibaba/nacos/core/controller/CoreOpsController.java +++ b/core/src/main/java/com/alibaba/nacos/core/controller/CoreOpsController.java @@ -17,14 +17,17 @@ package com.alibaba.nacos.core.controller; import com.alibaba.nacos.common.model.RestResult; -import com.alibaba.nacos.consistency.cp.CPProtocol; +import com.alibaba.nacos.common.model.RestResultUtils; +import com.alibaba.nacos.core.distributed.ProtocolManager; +import com.alibaba.nacos.core.distributed.id.IdGeneratorManager; import com.alibaba.nacos.core.utils.Commons; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.HashMap; import java.util.Map; /** @@ -34,8 +37,14 @@ import java.util.Map; @RequestMapping(Commons.NACOS_CORE_CONTEXT + "/ops") public class CoreOpsController { - @Autowired - private CPProtocol cpProtocol; + private final ProtocolManager protocolManager; + private final IdGeneratorManager idGeneratorManager; + + public CoreOpsController(ProtocolManager protocolManager, + IdGeneratorManager idGeneratorManager) { + this.protocolManager = protocolManager; + this.idGeneratorManager = idGeneratorManager; + } // Temporarily overpassed the raft operations interface // { @@ -48,7 +57,16 @@ public class CoreOpsController { @PostMapping(value = "/raft") public RestResult raftOps(@RequestBody Map commands) { - return cpProtocol.execute(commands); + return protocolManager.getCpProtocol().execute(commands); + } + + @GetMapping(value = "/idInfo") + public RestResult>> idInfo() { + Map> info = new HashMap<>(); + idGeneratorManager.getGeneratorMap() + .forEach( + (resource, idGenerator) -> info.put(resource, idGenerator.info())); + return RestResultUtils.success(info); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java index 9274e81d7..98adde118 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java @@ -157,7 +157,10 @@ public class ProtocolManager Set copy = new HashSet<>(event.getMembers()); - // Node change events between different protocols should not block each other + // Node change events between different protocols should not block each other. + // and we use a single thread pool to inform the consistency layer of node changes, + // to avoid multiple tasks simultaneously carrying out the consistency layer of + // node changes operation if (Objects.nonNull(apProtocol)) { ProtocolExecutor.apMemberChange(() -> apProtocol.memberChange(toAPMembersInfo(copy))); } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/id/IdGeneratorManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/id/IdGeneratorManager.java index a9cc35628..6ac3bacde 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/id/IdGeneratorManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/id/IdGeneratorManager.java @@ -70,4 +70,7 @@ public class IdGeneratorManager { "ID resource for the time being."); } + public Map getGeneratorMap() { + return generatorMap; + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftMaintainService.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftMaintainService.java index dbc102cda..cc663897b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftMaintainService.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftMaintainService.java @@ -70,13 +70,12 @@ public class JRaftMaintainService { if (node == null) { return RestResultUtils.failed("not this raft group : " + groupId); } - final Configuration conf = node.getOptions().getInitialConf(); final String command = args.keySet().iterator().next(); - JRaftOps commandEnums = JRaftOps.sourceOf(command); - if (Objects.isNull(commandEnums)) { + JRaftOps ops = JRaftOps.sourceOf(command); + if (Objects.isNull(ops)) { return RestResultUtils.failed("Not support command"); } - return commandEnums.execute(cliService, groupId, node, args); + return ops.execute(cliService, groupId, node, args); } catch (Throwable ex) { return RestResultUtils.failed(ex.getMessage()); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java index f951a2b52..252f54038 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.core.distributed.raft; import com.alibaba.nacos.common.model.RestResult; -import com.alibaba.nacos.common.utils.ConvertUtils; +import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.consistency.LogFuture; import com.alibaba.nacos.consistency.ProtocolMetaData; import com.alibaba.nacos.consistency.cp.CPProtocol; @@ -109,12 +109,10 @@ public class JRaftProtocol private Node raftNode; private ServerMemberManager memberManager; private String selfAddress = InetUtils.getSelfIp(); - private int failoverRetries = 1; - private String failoverRetriesStr = String.valueOf(failoverRetries); public JRaftProtocol(ServerMemberManager memberManager) throws Exception { this.memberManager = memberManager; - this.raftServer = new JRaftServer(failoverRetries); + this.raftServer = new JRaftServer(); this.jRaftMaintainService = new JRaftMaintainService(raftServer); } @@ -124,10 +122,6 @@ public class JRaftProtocol this.raftConfig = config; this.selfAddress = memberManager.getSelf().getAddress(); NotifyCenter.registerToSharePublisher(RaftEvent.class); - this.failoverRetries = ConvertUtils - .toInt(config.getVal(RaftSysConstants.REQUEST_FAILOVER_RETRIES), 1); - this.failoverRetriesStr = String.valueOf(failoverRetries); - this.raftServer.setFailoverRetries(failoverRetries); this.raftServer.init(this.raftConfig); this.raftServer.start(); @@ -178,10 +172,7 @@ public class JRaftProtocol @Override public GetResponse getData(GetRequest request) throws Exception { - int retryCnt = Integer.parseInt( - request.getExtendInfoOrDefault(RaftSysConstants.REQUEST_FAILOVER_RETRIES, - failoverRetriesStr)); - return raftServer.get(request, retryCnt); + return raftServer.get(request); } @Override @@ -193,16 +184,12 @@ public class JRaftProtocol @Override public CompletableFuture submitAsync(Log data) { - int retryCnt = Integer.parseInt( - data.getExtendInfoOrDefault(RaftSysConstants.REQUEST_FAILOVER_RETRIES, - failoverRetriesStr)); final Throwable[] throwable = new Throwable[] { null }; CompletableFuture future = new CompletableFuture<>(); try { CompletableFuture f = new CompletableFuture<>(); raftServer.commit(JRaftUtils - .injectExtendInfo(data, JRaftLogOperation.MODIFY_OPERATION), f, - retryCnt).whenComplete(new BiConsumer() { + .injectExtendInfo(data, JRaftLogOperation.MODIFY_OPERATION), f).whenComplete(new BiConsumer() { @Override public void accept(Object o, Throwable throwable) { future.complete(LogFuture.create(o, throwable)); @@ -220,7 +207,14 @@ public class JRaftProtocol @Override public void memberChange(Set addresses) { - this.raftConfig.setMembers(raftConfig.getSelfMember(), addresses); + for (int i = 0; i < 5; i ++) { + if (this.raftServer.peerChange(jRaftMaintainService, addresses)) { + break; + } else { + Loggers.RAFT.warn("peer removal failed"); + } + ThreadUtils.sleep(100L); + } } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftServer.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftServer.java index a03b7e8dd..836a16d8c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftServer.java @@ -65,22 +65,27 @@ import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService; import com.alipay.sofa.jraft.util.BytesUtil; +import com.google.common.base.Joiner; +import org.slf4j.Logger; import org.springframework.util.CollectionUtils; import java.nio.ByteBuffer; import java.nio.file.Paths; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; /** @@ -138,8 +143,7 @@ public class JRaftServer { System.getProperties().setProperty("bolt.netty.buffer.high.watermark", String.valueOf(256 * 1024 * 1024)); } - public JRaftServer(final int failoverRetries) throws Exception { - this.failoverRetries = failoverRetries; + public JRaftServer() throws Exception { this.conf = new Configuration(); } @@ -243,21 +247,6 @@ public class JRaftServer { NodeOptions copy = nodeOptions.copy(); JRaftUtils.initDirectory(parentPath, groupName, copy); - if (!registerSelfToCluster(groupName, localPeerId, configuration.copy())) { - // If the registration fails, you need to remove yourself first and then - // turn on the repeat registration logic - RaftExecutor.executeByCommon(() -> { - Configuration c = configuration.copy(); - c.addPeer(localPeerId); - for (; ; ) { - if (registerSelfToCluster(groupName, localPeerId, c)) { - break; - } - ThreadUtils.sleep(1000L); - } - }); - } - // Here, the LogProcessor is passed into StateMachine, and when the StateMachine // triggers onApply, the onApply of the LogProcessor is actually called NacosStateMachine machine = new NacosStateMachine(this, processor); @@ -284,6 +273,8 @@ public class JRaftServer { machine.setNode(node); RouteTable.getInstance().updateConfiguration(groupName, configuration); + RaftExecutor.executeByCommon(() -> registerSelfToCluster(groupName, localPeerId, configuration)); + // Turn on the leader auto refresh for this group Random random = new Random(); long period = nodeOptions.getElectionTimeoutMs() + random.nextInt(5 * 1000); @@ -294,7 +285,7 @@ public class JRaftServer { } } - GetResponse get(final GetRequest request, final int failoverRetries) { + GetResponse get(final GetRequest request) { final String group = request.getGroup(); CompletableFuture future = new CompletableFuture<>(); final RaftGroupTuple tuple = findTupleByGroup(group); @@ -323,7 +314,8 @@ public class JRaftServer { try { return future.get(rpcRequestTimeoutMs, TimeUnit.MILLISECONDS); } - catch (TimeoutException e) { + catch (Throwable e) { + Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}", e.toString()); // run raft read readThrouthRaftLog(request, future); try { @@ -331,12 +323,9 @@ public class JRaftServer { } catch (Throwable ex) { throw new ConsistencyException( - "Data acquisition failed : " + e.toString()); + "Data acquisition failed : " + e.toString() + ", read from leader has error : " + ex.toString()); } } - catch (Throwable e) { - throw new ConsistencyException("Data acquisition failed : " + e.toString()); - } } public void readThrouthRaftLog(final GetRequest request, @@ -347,7 +336,7 @@ public class JRaftServer { .putExtendInfo(JRaftConstants.JRAFT_EXTEND_INFO_KEY, JRaftLogOperation.READ_OPERATION).build(); CompletableFuture f = new CompletableFuture(); - commit(readLog, f, failoverRetries) + commit(readLog, f) .whenComplete(new BiConsumer() { @Override public void accept(byte[] result, Throwable throwable) { @@ -373,8 +362,7 @@ public class JRaftServer { }); } - public CompletableFuture commit(Log data, final CompletableFuture future, - final int retryLeft) { + public CompletableFuture commit(Log data, final CompletableFuture future) { LoggerUtils .printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data); final String group = data.getGroup(); @@ -384,8 +372,7 @@ public class JRaftServer { "No corresponding Raft Group found : " + group)); return future; } - RetryRunner runner = () -> commit(data, future, retryLeft - 1); - FailoverClosureImpl closure = new FailoverClosureImpl(future, retryLeft, runner); + FailoverClosureImpl closure = new FailoverClosureImpl(future); final Node node = tuple.node; if (node.isLeader()) { @@ -399,36 +386,27 @@ public class JRaftServer { return future; } - boolean registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) { - conf.removePeer(localPeerId); - PeerId leader = new PeerId(); - for (int i = 0; i < 5; i++) { - Status status = cliService.getLeader(groupId, conf, leader); - if (status.isOk()) { - break; + /** + * Add yourself to the Raft cluster + * + * @param groupId raft group + * @param selfIp local raft node address + * @param conf {@link Configuration} without self info + * @return join success + */ + void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) { + for ( ; ; ) { + List peerIds = cliService.getPeers(groupId, conf); + if (peerIds.contains(selfIp)) { + return; } - ThreadUtils.sleep(100L); - Loggers.RAFT.warn("get leader failed : {}", status); - } - - // This means that this is a new cluster, following the normal initialization logic - if (leader.isEmpty()) { - return true; - } - - for (int i = 0; i < 5; i++) { Status status = cliService.addPeer(groupId, conf, selfIp); if (status.isOk()) { - Loggers.RAFT.info("reigister self to cluster success"); - return true; - } - else { - Loggers.RAFT.error("register self to cluster has error : {}", status); - ThreadUtils.sleep(1_000L); + return; } + Loggers.RAFT.warn("Failed to join the cluster, retry..."); + ThreadUtils.sleep(1_000L); } - - return false; } protected PeerId getLeader(final String raftGroupId) { @@ -535,6 +513,37 @@ public class JRaftServer { } } + boolean peerChange(JRaftMaintainService maintainService, Set newPeers) { + Set oldPeers = this.raftConfig.getMembers(); + oldPeers.remove(newPeers); + + if (oldPeers.isEmpty()) { + return true; + } + + Set waitRemove = oldPeers; + AtomicInteger successCnt = new AtomicInteger(0); + multiRaftGroup.forEach(new BiConsumer() { + @Override + public void accept(String group, RaftGroupTuple tuple) { + final Node node = tuple.getNode(); + if (!node.isLeader()) { + return; + } + Map params = new HashMap<>(); + params.put(JRaftConstants.GROUP_ID, group); + params.put(JRaftConstants.REMOVE_PEERS, Joiner.on(",").join(waitRemove)); + RestResult result = maintainService.execute(params); + if (result.ok()) { + successCnt.incrementAndGet(); + } + } + }); + this.raftConfig.setMembers(localPeerId.toString(), newPeers); + + return successCnt.get() == multiRaftGroup.size(); + } + void refreshRouteTable(String group) { if (isShutdown) { return; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/FailoverClosureImpl.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/FailoverClosureImpl.java index 200cb6abe..b1ca3d8af 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/FailoverClosureImpl.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/FailoverClosureImpl.java @@ -17,10 +17,7 @@ package com.alibaba.nacos.core.distributed.raft.utils; import com.alibaba.nacos.consistency.exception.ConsistencyException; -import com.alibaba.nacos.core.distributed.raft.exception.NoLeaderException; -import com.alibaba.nacos.core.utils.Loggers; import com.alipay.sofa.jraft.Status; -import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -32,17 +29,11 @@ import java.util.concurrent.CompletableFuture; public class FailoverClosureImpl implements FailoverClosure { private final CompletableFuture future; - private final int retriesLeft; - private final RetryRunner retryRunner; private volatile T data; private volatile Throwable throwable; - public FailoverClosureImpl(final CompletableFuture future, - final int retriesLeft, - final RetryRunner retryRunner) { + public FailoverClosureImpl(final CompletableFuture future) { this.future = future; - this.retriesLeft = retriesLeft; - this.retryRunner = retryRunner; } @Override @@ -62,31 +53,11 @@ public class FailoverClosureImpl implements FailoverClosure { return; } final Throwable throwable = this.throwable; - if (retriesLeft >= 0 && canRetryException(throwable)) { - Loggers.RAFT.warn("[Failover] status: {}, error: {}, [{}] retries left.", status, - throwable, this.retriesLeft); - this.retryRunner.run(); + if (Objects.nonNull(throwable)) { + future.completeExceptionally(new ConsistencyException(throwable)); } else { - if (this.retriesLeft <= 0) { - Loggers.RAFT.error("[InvalidEpoch-Failover] status: {}, error: {}, {} retries left.", - status, throwable, - this.retriesLeft); - } - if (Objects.nonNull(throwable)) { - future.completeExceptionally(new ConsistencyException(throwable)); - } else { - future.completeExceptionally(new ConsistencyException("Maximum number of retries has been reached")); - } + future.completeExceptionally(new ConsistencyException("operation failure")); } } - protected boolean canRetryException(Throwable throwable) { - if (throwable == null) { - return false; - } - if (throwable instanceof NoLeaderException) { - return true; - } - return throwable instanceof IOException; - } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftConstants.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftConstants.java index 06ec61cda..ed261bbb7 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftConstants.java @@ -34,4 +34,8 @@ public class JRaftConstants { public static final String REMOVE_PEER = "removePeer"; + public static final String REMOVE_PEERS = "removePeers"; + + public static final String CHANGE_PEERS = "changePeers"; + } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftOps.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftOps.java index 1b419c4ff..a2c32444d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftOps.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftOps.java @@ -97,6 +97,46 @@ public enum JRaftOps { } }, + REMOVE_PEERS("removePeers") { + @Override + public RestResult execute(CliService cliService, String groupId, + Node node, Map args) { + final Configuration conf = node.getOptions().getInitialConf(); + final String peers = args.get(JRaftConstants.REMOVE_PEERS); + for (String s : peers.split(",")) { + final PeerId waitRemove = PeerId.parsePeer(s); + Status status = cliService.removePeer(groupId, conf, waitRemove); + if (!status.isOk()) { + return RestResultUtils.failed(status.getErrorMsg()); + } + } + return RestResultUtils.success(); + } + }, + + CHANGE_PEERS("changePeers") { + @Override + public RestResult execute(CliService cliService, String groupId, + Node node, Map args) { + final Configuration conf = node.getOptions().getInitialConf(); + final Configuration newConf = new Configuration(); + String peers = args.get(JRaftConstants.CHANGE_PEERS); + for (String peer : peers.split(",")) { + newConf.addPeer(PeerId.parsePeer(peer.trim())); + } + + if (Objects.equals(conf, newConf)) { + return RestResultUtils.success(); + } + + Status status = cliService.changePeers(groupId, conf, newConf); + if (status.isOk()) { + return RestResultUtils.success(); + } + return RestResultUtils.failed(status.getErrorMsg()); + } + } + ; private String name;