fix: cluster startup mode adjustment

This commit is contained in:
chuntaojun 2020-05-14 10:15:35 +08:00
parent a68f5e22cf
commit 52ea9674be
9 changed files with 156 additions and 115 deletions

View File

@ -17,14 +17,17 @@
package com.alibaba.nacos.core.controller; package com.alibaba.nacos.core.controller;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.common.model.RestResultUtils;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.distributed.id.IdGeneratorManager;
import com.alibaba.nacos.core.utils.Commons; import com.alibaba.nacos.core.utils.Commons;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
@ -34,8 +37,14 @@ import java.util.Map;
@RequestMapping(Commons.NACOS_CORE_CONTEXT + "/ops") @RequestMapping(Commons.NACOS_CORE_CONTEXT + "/ops")
public class CoreOpsController { public class CoreOpsController {
@Autowired private final ProtocolManager protocolManager;
private CPProtocol cpProtocol; private final IdGeneratorManager idGeneratorManager;
public CoreOpsController(ProtocolManager protocolManager,
IdGeneratorManager idGeneratorManager) {
this.protocolManager = protocolManager;
this.idGeneratorManager = idGeneratorManager;
}
// Temporarily overpassed the raft operations interface // Temporarily overpassed the raft operations interface
// { // {
@ -48,7 +57,16 @@ public class CoreOpsController {
@PostMapping(value = "/raft") @PostMapping(value = "/raft")
public RestResult<String> raftOps(@RequestBody Map<String, String> commands) { public RestResult<String> raftOps(@RequestBody Map<String, String> commands) {
return cpProtocol.execute(commands); return protocolManager.getCpProtocol().execute(commands);
}
@GetMapping(value = "/idInfo")
public RestResult<Map<String, Map<Object, Object>>> idInfo() {
Map<String, Map<Object, Object>> info = new HashMap<>();
idGeneratorManager.getGeneratorMap()
.forEach(
(resource, idGenerator) -> info.put(resource, idGenerator.info()));
return RestResultUtils.success(info);
} }
} }

View File

@ -157,7 +157,10 @@ public class ProtocolManager
Set<Member> copy = new HashSet<>(event.getMembers()); Set<Member> copy = new HashSet<>(event.getMembers());
// Node change events between different protocols should not block each other // Node change events between different protocols should not block each other.
// and we use a single thread pool to inform the consistency layer of node changes,
// to avoid multiple tasks simultaneously carrying out the consistency layer of
// node changes operation
if (Objects.nonNull(apProtocol)) { if (Objects.nonNull(apProtocol)) {
ProtocolExecutor.apMemberChange(() -> apProtocol.memberChange(toAPMembersInfo(copy))); ProtocolExecutor.apMemberChange(() -> apProtocol.memberChange(toAPMembersInfo(copy)));
} }

View File

@ -70,4 +70,7 @@ public class IdGeneratorManager {
"ID resource for the time being."); "ID resource for the time being.");
} }
public Map<String, IdGenerator> getGeneratorMap() {
return generatorMap;
}
} }

View File

@ -70,13 +70,12 @@ public class JRaftMaintainService {
if (node == null) { if (node == null) {
return RestResultUtils.failed("not this raft group : " + groupId); return RestResultUtils.failed("not this raft group : " + groupId);
} }
final Configuration conf = node.getOptions().getInitialConf();
final String command = args.keySet().iterator().next(); final String command = args.keySet().iterator().next();
JRaftOps commandEnums = JRaftOps.sourceOf(command); JRaftOps ops = JRaftOps.sourceOf(command);
if (Objects.isNull(commandEnums)) { if (Objects.isNull(ops)) {
return RestResultUtils.failed("Not support command"); return RestResultUtils.failed("Not support command");
} }
return commandEnums.execute(cliService, groupId, node, args); return ops.execute(cliService, groupId, node, args);
} }
catch (Throwable ex) { catch (Throwable ex) {
return RestResultUtils.failed(ex.getMessage()); return RestResultUtils.failed(ex.getMessage());

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.core.distributed.raft; package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.LogFuture; import com.alibaba.nacos.consistency.LogFuture;
import com.alibaba.nacos.consistency.ProtocolMetaData; import com.alibaba.nacos.consistency.ProtocolMetaData;
import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.consistency.cp.CPProtocol;
@ -109,12 +109,10 @@ public class JRaftProtocol
private Node raftNode; private Node raftNode;
private ServerMemberManager memberManager; private ServerMemberManager memberManager;
private String selfAddress = InetUtils.getSelfIp(); private String selfAddress = InetUtils.getSelfIp();
private int failoverRetries = 1;
private String failoverRetriesStr = String.valueOf(failoverRetries);
public JRaftProtocol(ServerMemberManager memberManager) throws Exception { public JRaftProtocol(ServerMemberManager memberManager) throws Exception {
this.memberManager = memberManager; this.memberManager = memberManager;
this.raftServer = new JRaftServer(failoverRetries); this.raftServer = new JRaftServer();
this.jRaftMaintainService = new JRaftMaintainService(raftServer); this.jRaftMaintainService = new JRaftMaintainService(raftServer);
} }
@ -124,10 +122,6 @@ public class JRaftProtocol
this.raftConfig = config; this.raftConfig = config;
this.selfAddress = memberManager.getSelf().getAddress(); this.selfAddress = memberManager.getSelf().getAddress();
NotifyCenter.registerToSharePublisher(RaftEvent.class); NotifyCenter.registerToSharePublisher(RaftEvent.class);
this.failoverRetries = ConvertUtils
.toInt(config.getVal(RaftSysConstants.REQUEST_FAILOVER_RETRIES), 1);
this.failoverRetriesStr = String.valueOf(failoverRetries);
this.raftServer.setFailoverRetries(failoverRetries);
this.raftServer.init(this.raftConfig); this.raftServer.init(this.raftConfig);
this.raftServer.start(); this.raftServer.start();
@ -178,10 +172,7 @@ public class JRaftProtocol
@Override @Override
public GetResponse getData(GetRequest request) throws Exception { public GetResponse getData(GetRequest request) throws Exception {
int retryCnt = Integer.parseInt( return raftServer.get(request);
request.getExtendInfoOrDefault(RaftSysConstants.REQUEST_FAILOVER_RETRIES,
failoverRetriesStr));
return raftServer.get(request, retryCnt);
} }
@Override @Override
@ -193,16 +184,12 @@ public class JRaftProtocol
@Override @Override
public CompletableFuture<LogFuture> submitAsync(Log data) { public CompletableFuture<LogFuture> submitAsync(Log data) {
int retryCnt = Integer.parseInt(
data.getExtendInfoOrDefault(RaftSysConstants.REQUEST_FAILOVER_RETRIES,
failoverRetriesStr));
final Throwable[] throwable = new Throwable[] { null }; final Throwable[] throwable = new Throwable[] { null };
CompletableFuture<LogFuture> future = new CompletableFuture<>(); CompletableFuture<LogFuture> future = new CompletableFuture<>();
try { try {
CompletableFuture<Object> f = new CompletableFuture<>(); CompletableFuture<Object> f = new CompletableFuture<>();
raftServer.commit(JRaftUtils raftServer.commit(JRaftUtils
.injectExtendInfo(data, JRaftLogOperation.MODIFY_OPERATION), f, .injectExtendInfo(data, JRaftLogOperation.MODIFY_OPERATION), f).whenComplete(new BiConsumer<Object, Throwable>() {
retryCnt).whenComplete(new BiConsumer<Object, Throwable>() {
@Override @Override
public void accept(Object o, Throwable throwable) { public void accept(Object o, Throwable throwable) {
future.complete(LogFuture.create(o, throwable)); future.complete(LogFuture.create(o, throwable));
@ -220,7 +207,14 @@ public class JRaftProtocol
@Override @Override
public void memberChange(Set<String> addresses) { public void memberChange(Set<String> addresses) {
this.raftConfig.setMembers(raftConfig.getSelfMember(), addresses); for (int i = 0; i < 5; i ++) {
if (this.raftServer.peerChange(jRaftMaintainService, addresses)) {
break;
} else {
Loggers.RAFT.warn("peer removal failed");
}
ThreadUtils.sleep(100L);
}
} }
@Override @Override

View File

@ -65,22 +65,27 @@ import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService; import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.BytesUtil;
import com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
/** /**
@ -138,8 +143,7 @@ public class JRaftServer {
System.getProperties().setProperty("bolt.netty.buffer.high.watermark", String.valueOf(256 * 1024 * 1024)); System.getProperties().setProperty("bolt.netty.buffer.high.watermark", String.valueOf(256 * 1024 * 1024));
} }
public JRaftServer(final int failoverRetries) throws Exception { public JRaftServer() throws Exception {
this.failoverRetries = failoverRetries;
this.conf = new Configuration(); this.conf = new Configuration();
} }
@ -243,21 +247,6 @@ public class JRaftServer {
NodeOptions copy = nodeOptions.copy(); NodeOptions copy = nodeOptions.copy();
JRaftUtils.initDirectory(parentPath, groupName, copy); JRaftUtils.initDirectory(parentPath, groupName, copy);
if (!registerSelfToCluster(groupName, localPeerId, configuration.copy())) {
// If the registration fails, you need to remove yourself first and then
// turn on the repeat registration logic
RaftExecutor.executeByCommon(() -> {
Configuration c = configuration.copy();
c.addPeer(localPeerId);
for (; ; ) {
if (registerSelfToCluster(groupName, localPeerId, c)) {
break;
}
ThreadUtils.sleep(1000L);
}
});
}
// Here, the LogProcessor is passed into StateMachine, and when the StateMachine // Here, the LogProcessor is passed into StateMachine, and when the StateMachine
// triggers onApply, the onApply of the LogProcessor is actually called // triggers onApply, the onApply of the LogProcessor is actually called
NacosStateMachine machine = new NacosStateMachine(this, processor); NacosStateMachine machine = new NacosStateMachine(this, processor);
@ -284,6 +273,8 @@ public class JRaftServer {
machine.setNode(node); machine.setNode(node);
RouteTable.getInstance().updateConfiguration(groupName, configuration); RouteTable.getInstance().updateConfiguration(groupName, configuration);
RaftExecutor.executeByCommon(() -> registerSelfToCluster(groupName, localPeerId, configuration));
// Turn on the leader auto refresh for this group // Turn on the leader auto refresh for this group
Random random = new Random(); Random random = new Random();
long period = nodeOptions.getElectionTimeoutMs() + random.nextInt(5 * 1000); long period = nodeOptions.getElectionTimeoutMs() + random.nextInt(5 * 1000);
@ -294,7 +285,7 @@ public class JRaftServer {
} }
} }
GetResponse get(final GetRequest request, final int failoverRetries) { GetResponse get(final GetRequest request) {
final String group = request.getGroup(); final String group = request.getGroup();
CompletableFuture<GetResponse> future = new CompletableFuture<>(); CompletableFuture<GetResponse> future = new CompletableFuture<>();
final RaftGroupTuple tuple = findTupleByGroup(group); final RaftGroupTuple tuple = findTupleByGroup(group);
@ -323,7 +314,8 @@ public class JRaftServer {
try { try {
return future.get(rpcRequestTimeoutMs, TimeUnit.MILLISECONDS); return future.get(rpcRequestTimeoutMs, TimeUnit.MILLISECONDS);
} }
catch (TimeoutException e) { catch (Throwable e) {
Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}", e.toString());
// run raft read // run raft read
readThrouthRaftLog(request, future); readThrouthRaftLog(request, future);
try { try {
@ -331,12 +323,9 @@ public class JRaftServer {
} }
catch (Throwable ex) { catch (Throwable ex) {
throw new ConsistencyException( throw new ConsistencyException(
"Data acquisition failed : " + e.toString()); "Data acquisition failed : " + e.toString() + ", read from leader has error : " + ex.toString());
} }
} }
catch (Throwable e) {
throw new ConsistencyException("Data acquisition failed : " + e.toString());
}
} }
public void readThrouthRaftLog(final GetRequest request, public void readThrouthRaftLog(final GetRequest request,
@ -347,7 +336,7 @@ public class JRaftServer {
.putExtendInfo(JRaftConstants.JRAFT_EXTEND_INFO_KEY, .putExtendInfo(JRaftConstants.JRAFT_EXTEND_INFO_KEY,
JRaftLogOperation.READ_OPERATION).build(); JRaftLogOperation.READ_OPERATION).build();
CompletableFuture<byte[]> f = new CompletableFuture<byte[]>(); CompletableFuture<byte[]> f = new CompletableFuture<byte[]>();
commit(readLog, f, failoverRetries) commit(readLog, f)
.whenComplete(new BiConsumer<byte[], Throwable>() { .whenComplete(new BiConsumer<byte[], Throwable>() {
@Override @Override
public void accept(byte[] result, Throwable throwable) { public void accept(byte[] result, Throwable throwable) {
@ -373,8 +362,7 @@ public class JRaftServer {
}); });
} }
public <T> CompletableFuture<T> commit(Log data, final CompletableFuture<T> future, public <T> CompletableFuture<T> commit(Log data, final CompletableFuture<T> future) {
final int retryLeft) {
LoggerUtils LoggerUtils
.printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data); .printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data);
final String group = data.getGroup(); final String group = data.getGroup();
@ -384,8 +372,7 @@ public class JRaftServer {
"No corresponding Raft Group found : " + group)); "No corresponding Raft Group found : " + group));
return future; return future;
} }
RetryRunner runner = () -> commit(data, future, retryLeft - 1); FailoverClosureImpl closure = new FailoverClosureImpl(future);
FailoverClosureImpl closure = new FailoverClosureImpl(future, retryLeft, runner);
final Node node = tuple.node; final Node node = tuple.node;
if (node.isLeader()) { if (node.isLeader()) {
@ -399,36 +386,27 @@ public class JRaftServer {
return future; return future;
} }
boolean registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) { /**
conf.removePeer(localPeerId); * Add yourself to the Raft cluster
PeerId leader = new PeerId(); *
for (int i = 0; i < 5; i++) { * @param groupId raft group
Status status = cliService.getLeader(groupId, conf, leader); * @param selfIp local raft node address
if (status.isOk()) { * @param conf {@link Configuration} without self info
break; * @return join success
*/
void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
for ( ; ; ) {
List<PeerId> peerIds = cliService.getPeers(groupId, conf);
if (peerIds.contains(selfIp)) {
return;
} }
ThreadUtils.sleep(100L);
Loggers.RAFT.warn("get leader failed : {}", status);
}
// This means that this is a new cluster, following the normal initialization logic
if (leader.isEmpty()) {
return true;
}
for (int i = 0; i < 5; i++) {
Status status = cliService.addPeer(groupId, conf, selfIp); Status status = cliService.addPeer(groupId, conf, selfIp);
if (status.isOk()) { if (status.isOk()) {
Loggers.RAFT.info("reigister self to cluster success"); return;
return true;
}
else {
Loggers.RAFT.error("register self to cluster has error : {}", status);
ThreadUtils.sleep(1_000L);
} }
Loggers.RAFT.warn("Failed to join the cluster, retry...");
ThreadUtils.sleep(1_000L);
} }
return false;
} }
protected PeerId getLeader(final String raftGroupId) { protected PeerId getLeader(final String raftGroupId) {
@ -535,6 +513,37 @@ public class JRaftServer {
} }
} }
boolean peerChange(JRaftMaintainService maintainService, Set<String> newPeers) {
Set<String> oldPeers = this.raftConfig.getMembers();
oldPeers.remove(newPeers);
if (oldPeers.isEmpty()) {
return true;
}
Set<String> waitRemove = oldPeers;
AtomicInteger successCnt = new AtomicInteger(0);
multiRaftGroup.forEach(new BiConsumer<String, RaftGroupTuple>() {
@Override
public void accept(String group, RaftGroupTuple tuple) {
final Node node = tuple.getNode();
if (!node.isLeader()) {
return;
}
Map<String, String> params = new HashMap<>();
params.put(JRaftConstants.GROUP_ID, group);
params.put(JRaftConstants.REMOVE_PEERS, Joiner.on(",").join(waitRemove));
RestResult<String> result = maintainService.execute(params);
if (result.ok()) {
successCnt.incrementAndGet();
}
}
});
this.raftConfig.setMembers(localPeerId.toString(), newPeers);
return successCnt.get() == multiRaftGroup.size();
}
void refreshRouteTable(String group) { void refreshRouteTable(String group) {
if (isShutdown) { if (isShutdown) {
return; return;

View File

@ -17,10 +17,7 @@
package com.alibaba.nacos.core.distributed.raft.utils; package com.alibaba.nacos.core.distributed.raft.utils;
import com.alibaba.nacos.consistency.exception.ConsistencyException; import com.alibaba.nacos.consistency.exception.ConsistencyException;
import com.alibaba.nacos.core.distributed.raft.exception.NoLeaderException;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -32,17 +29,11 @@ import java.util.concurrent.CompletableFuture;
public class FailoverClosureImpl<T> implements FailoverClosure<T> { public class FailoverClosureImpl<T> implements FailoverClosure<T> {
private final CompletableFuture<T> future; private final CompletableFuture<T> future;
private final int retriesLeft;
private final RetryRunner retryRunner;
private volatile T data; private volatile T data;
private volatile Throwable throwable; private volatile Throwable throwable;
public FailoverClosureImpl(final CompletableFuture<T> future, public FailoverClosureImpl(final CompletableFuture<T> future) {
final int retriesLeft,
final RetryRunner retryRunner) {
this.future = future; this.future = future;
this.retriesLeft = retriesLeft;
this.retryRunner = retryRunner;
} }
@Override @Override
@ -62,31 +53,11 @@ public class FailoverClosureImpl<T> implements FailoverClosure<T> {
return; return;
} }
final Throwable throwable = this.throwable; final Throwable throwable = this.throwable;
if (retriesLeft >= 0 && canRetryException(throwable)) { if (Objects.nonNull(throwable)) {
Loggers.RAFT.warn("[Failover] status: {}, error: {}, [{}] retries left.", status, future.completeExceptionally(new ConsistencyException(throwable));
throwable, this.retriesLeft);
this.retryRunner.run();
} else { } else {
if (this.retriesLeft <= 0) { future.completeExceptionally(new ConsistencyException("operation failure"));
Loggers.RAFT.error("[InvalidEpoch-Failover] status: {}, error: {}, {} retries left.",
status, throwable,
this.retriesLeft);
}
if (Objects.nonNull(throwable)) {
future.completeExceptionally(new ConsistencyException(throwable));
} else {
future.completeExceptionally(new ConsistencyException("Maximum number of retries has been reached"));
}
} }
} }
protected boolean canRetryException(Throwable throwable) {
if (throwable == null) {
return false;
}
if (throwable instanceof NoLeaderException) {
return true;
}
return throwable instanceof IOException;
}
} }

View File

@ -34,4 +34,8 @@ public class JRaftConstants {
public static final String REMOVE_PEER = "removePeer"; public static final String REMOVE_PEER = "removePeer";
public static final String REMOVE_PEERS = "removePeers";
public static final String CHANGE_PEERS = "changePeers";
} }

View File

@ -97,6 +97,46 @@ public enum JRaftOps {
} }
}, },
REMOVE_PEERS("removePeers") {
@Override
public RestResult<String> execute(CliService cliService, String groupId,
Node node, Map<String, String> args) {
final Configuration conf = node.getOptions().getInitialConf();
final String peers = args.get(JRaftConstants.REMOVE_PEERS);
for (String s : peers.split(",")) {
final PeerId waitRemove = PeerId.parsePeer(s);
Status status = cliService.removePeer(groupId, conf, waitRemove);
if (!status.isOk()) {
return RestResultUtils.failed(status.getErrorMsg());
}
}
return RestResultUtils.success();
}
},
CHANGE_PEERS("changePeers") {
@Override
public RestResult<String> execute(CliService cliService, String groupId,
Node node, Map<String, String> args) {
final Configuration conf = node.getOptions().getInitialConf();
final Configuration newConf = new Configuration();
String peers = args.get(JRaftConstants.CHANGE_PEERS);
for (String peer : peers.split(",")) {
newConf.addPeer(PeerId.parsePeer(peer.trim()));
}
if (Objects.equals(conf, newConf)) {
return RestResultUtils.success();
}
Status status = cliService.changePeers(groupId, conf, newConf);
if (status.isOk()) {
return RestResultUtils.success();
}
return RestResultUtils.failed(status.getErrorMsg());
}
}
; ;
private String name; private String name;