Merge pull request #2981 from alibaba/hotfix_jraft

[HOTFIX] Raft RPC being submitted to the Leader, error could not be…
This commit is contained in:
杨翊 SionYang 2020-06-07 19:53:32 +08:00 committed by GitHub
commit c2e3530ee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 238 additions and 147 deletions

View File

@ -160,17 +160,8 @@ public class EmbeddedDumpService extends DumpService {
@Override @Override
protected boolean canExecute() { protected boolean canExecute() {
try { // if is derby + raft mode, only leader can execute
// if is derby + raft mode, only leader can execute CPProtocol protocol = protocolManager.getCpProtocol();
CPProtocol protocol = protocolManager.getCpProtocol(); return protocol.isLeader(Constants.CONFIG_MODEL_RAFT_GROUP);
return protocol.isLeader(Constants.CONFIG_MODEL_RAFT_GROUP);
}
catch (NoSuchRaftGroupException e) {
return true;
}
catch (Throwable e) {
// It's impossible to get to this point
throw new RuntimeException(e);
}
} }
} }

View File

@ -26,7 +26,9 @@ import com.alibaba.nacos.config.server.utils.ContentUtils;
import com.alibaba.nacos.config.server.utils.PropertyUtil; import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.TimeUtils; import com.alibaba.nacos.config.server.utils.TimeUtils;
import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException; import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.InetUtils; import com.alibaba.nacos.core.utils.InetUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,8 +55,6 @@ public class MergeDatumService {
static final AtomicInteger FINISHED = new AtomicInteger(); static final AtomicInteger FINISHED = new AtomicInteger();
static int total = 0; static int total = 0;
private CPProtocol protocol;
@Autowired @Autowired
public MergeDatumService(PersistService persistService) { public MergeDatumService(PersistService persistService) {
this.persistService = persistService; this.persistService = persistService;
@ -110,7 +110,8 @@ public class MergeDatumService {
return true; return true;
} }
try { try {
return protocol.isLeader(Constants.CONFIG_MODEL_RAFT_GROUP); ProtocolManager protocolManager = ApplicationUtils.getBean(ProtocolManager.class);
return protocolManager.getCpProtocol().isLeader(Constants.CONFIG_MODEL_RAFT_GROUP);
} catch (NoSuchRaftGroupException e) { } catch (NoSuchRaftGroupException e) {
return true; return true;
} catch (Exception e) { } catch (Exception e) {

View File

@ -30,8 +30,7 @@ public interface CPProtocol<C extends Config, P extends LogProcessor4CP> extends
* *
* @param group business module info * @param group business module info
* @return is leader * @return is leader
* @throws Exception
*/ */
boolean isLeader(String group) throws Exception; boolean isLeader(String group);
} }

View File

@ -209,7 +209,7 @@ public class JRaftProtocol
} }
@Override @Override
public boolean isLeader(String group) throws Exception { public boolean isLeader(String group) {
Node node = raftServer.findNodeByGroup(group); Node node = raftServer.findNodeByGroup(group);
if (node == null) { if (node == null) {
throw new NoSuchRaftGroupException(group); throw new NoSuchRaftGroupException(group);

View File

@ -16,10 +16,10 @@
package com.alibaba.nacos.core.distributed.raft; package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.LogProcessor; import com.alibaba.nacos.consistency.LogProcessor;
import com.alibaba.nacos.consistency.SerializeFactory; import com.alibaba.nacos.consistency.SerializeFactory;
@ -39,11 +39,8 @@ import com.alibaba.nacos.core.distributed.raft.utils.JRaftUtils;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor; import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.distributed.raft.utils.RaftOptionsBuilder; import com.alibaba.nacos.core.distributed.raft.utils.RaftOptionsBuilder;
import com.alibaba.nacos.core.monitor.MetricsMonitor; import com.alibaba.nacos.core.monitor.MetricsMonitor;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alipay.sofa.jraft.CliService; import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.RaftGroupService;
@ -64,8 +61,8 @@ import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Endpoint;
import com.google.protobuf.Message;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.protobuf.Message;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -138,11 +135,13 @@ public class JRaftServer {
// System.getProperties().setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf(64 * 1024 * 1024)); // System.getProperties().setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf(64 * 1024 * 1024));
// System.getProperties().setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf(256 * 1024 * 1024)); // System.getProperties().setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf(256 * 1024 * 1024));
System.getProperties().setProperty("bolt.netty.buffer.low.watermark", String.valueOf(128 * 1024 * 1024)); System.getProperties().setProperty("bolt.netty.buffer.low.watermark",
System.getProperties().setProperty("bolt.netty.buffer.high.watermark", String.valueOf(256 * 1024 * 1024)); String.valueOf(128 * 1024 * 1024));
System.getProperties().setProperty("bolt.netty.buffer.high.watermark",
String.valueOf(256 * 1024 * 1024));
} }
public JRaftServer() throws Exception { public JRaftServer() {
this.conf = new Configuration(); this.conf = new Configuration();
} }
@ -272,7 +271,8 @@ public class JRaftServer {
machine.setNode(node); machine.setNode(node);
RouteTable.getInstance().updateConfiguration(groupName, configuration); RouteTable.getInstance().updateConfiguration(groupName, configuration);
RaftExecutor.executeByCommon(() -> registerSelfToCluster(groupName, localPeerId, configuration)); RaftExecutor.executeByCommon(
() -> registerSelfToCluster(groupName, localPeerId, configuration));
// Turn on the leader auto refresh for this group // Turn on the leader auto refresh for this group
Random random = new Random(); Random random = new Random();
@ -305,21 +305,25 @@ public class JRaftServer {
} }
catch (Throwable t) { catch (Throwable t) {
MetricsMonitor.raftReadIndexFailed(); MetricsMonitor.raftReadIndexFailed();
future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading", t)); future.completeExceptionally(new ConsistencyException(
"The conformance protocol is temporarily unavailable for reading",
t));
} }
return; return;
} }
MetricsMonitor.raftReadIndexFailed(); MetricsMonitor.raftReadIndexFailed();
Loggers.RAFT.error("ReadIndex has error : {}", status.getErrorMsg()); Loggers.RAFT.error("ReadIndex has error : {}", status.getErrorMsg());
future.completeExceptionally( future.completeExceptionally(new ConsistencyException(
new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + status.getErrorMsg())); "The conformance protocol is temporarily unavailable for reading, "
+ status.getErrorMsg()));
} }
}); });
return future; return future;
} }
catch (Throwable e) { catch (Throwable e) {
MetricsMonitor.raftReadFromLeader(); MetricsMonitor.raftReadFromLeader();
Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}", e.toString()); Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}",
e.toString());
// run raft read // run raft read
readFromLeader(request, future); readFromLeader(request, future);
return future; return future;
@ -333,20 +337,25 @@ public class JRaftServer {
@Override @Override
public void accept(Response response, Throwable throwable) { public void accept(Response response, Throwable throwable) {
if (Objects.nonNull(throwable)) { if (Objects.nonNull(throwable)) {
future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading", throwable)); future.completeExceptionally(new ConsistencyException(
"The conformance protocol is temporarily unavailable for reading",
throwable));
return; return;
} }
if (response.getSuccess()) { if (response.getSuccess()) {
future.complete(response); future.complete(response);
} else { }
future.completeExceptionally( else {
new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + response.getErrMsg())); future.completeExceptionally(new ConsistencyException(
"The conformance protocol is temporarily unavailable for reading, "
+ response.getErrMsg()));
} }
} }
}); });
} }
public CompletableFuture<Response> commit(final String group, final Message data, final CompletableFuture<Response> future) { public CompletableFuture<Response> commit(final String group, final Message data,
final CompletableFuture<Response> future) {
LoggerUtils LoggerUtils
.printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data); .printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data);
final RaftGroupTuple tuple = findTupleByGroup(group); final RaftGroupTuple tuple = findTupleByGroup(group);
@ -374,12 +383,12 @@ public class JRaftServer {
* Add yourself to the Raft cluster * Add yourself to the Raft cluster
* *
* @param groupId raft group * @param groupId raft group
* @param selfIp local raft node address * @param selfIp local raft node address
* @param conf {@link Configuration} without self info * @param conf {@link Configuration} without self info
* @return join success * @return join success
*/ */
void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) { void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
for ( ; ; ) { for (; ; ) {
List<PeerId> peerIds = cliService.getPeers(groupId, conf); List<PeerId> peerIds = cliService.getPeers(groupId, conf);
if (peerIds.contains(selfIp)) { if (peerIds.contains(selfIp)) {
return; return;
@ -428,14 +437,10 @@ public class JRaftServer {
public void applyOperation(Node node, Message data, FailoverClosure closure) { public void applyOperation(Node node, Message data, FailoverClosure closure) {
final Task task = new Task(); final Task task = new Task();
task.setDone(new NacosClosure(data, status -> { task.setDone(new NacosClosure(data, status -> {
NacosClosure.NStatus nStatus = (NacosClosure.NStatus) status; NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus) status;
if (Objects.nonNull(nStatus.getThrowable())) { closure.setThrowable(nacosStatus.getThrowable());
closure.setThrowable(nStatus.getThrowable()); closure.setResponse(nacosStatus.getResponse());
} closure.run(nacosStatus);
else {
closure.setData(nStatus.getResult());
}
closure.run(nStatus);
})); }));
task.setData(ByteBuffer.wrap(data.toByteArray())); task.setData(ByteBuffer.wrap(data.toByteArray()));
node.apply(task); node.apply(task);
@ -452,10 +457,11 @@ public class JRaftServer {
public void complete(Object o, Throwable ex) { public void complete(Object o, Throwable ex) {
if (Objects.nonNull(ex)) { if (Objects.nonNull(ex)) {
closure.setThrowable(ex); closure.setThrowable(ex);
closure.run(new Status(RaftError.UNKNOWN, ex.getMessage())); closure.run(
new Status(RaftError.UNKNOWN, ex.getMessage()));
return; return;
} }
closure.setData(o); closure.setResponse((Response) o);
closure.run(Status.OK()); closure.run(Status.OK());
} }
@ -491,7 +497,8 @@ public class JRaftServer {
RestResult<String> result = maintainService.execute(params); RestResult<String> result = maintainService.execute(params);
if (result.ok()) { if (result.ok()) {
successCnt.incrementAndGet(); successCnt.incrementAndGet();
} else { }
else {
Loggers.RAFT.error("Node removal failed : {}", result); Loggers.RAFT.error("Node removal failed : {}", result);
} }
} }
@ -513,7 +520,8 @@ public class JRaftServer {
Configuration oldConf = instance.getConfiguration(groupName); Configuration oldConf = instance.getConfiguration(groupName);
String oldLeader = Optional.ofNullable(instance.selectLeader(groupName)) String oldLeader = Optional.ofNullable(instance.selectLeader(groupName))
.orElse(PeerId.emptyPeer()).getEndpoint().toString(); .orElse(PeerId.emptyPeer()).getEndpoint().toString();
status = instance.refreshConfiguration(this.cliClientService, groupName, rpcRequestTimeoutMs); status = instance.refreshConfiguration(this.cliClientService, groupName,
rpcRequestTimeoutMs);
if (!status.isOk()) { if (!status.isOk()) {
Loggers.RAFT Loggers.RAFT
.error("Fail to refresh route configuration for group : {}, status is : {}", .error("Fail to refresh route configuration for group : {}, status is : {}",
@ -548,12 +556,16 @@ public class JRaftServer {
return cliService; return cliService;
} }
public class RaftGroupTuple { public static class RaftGroupTuple {
private final LogProcessor processor; private LogProcessor processor;
private final Node node; private Node node;
private final RaftGroupService raftGroupService; private RaftGroupService raftGroupService;
private final NacosStateMachine machine; private NacosStateMachine machine;
@JustForTest
public RaftGroupTuple() {
}
public RaftGroupTuple(Node node, LogProcessor processor, public RaftGroupTuple(Node node, LogProcessor processor,
RaftGroupService raftGroupService, NacosStateMachine machine) { RaftGroupService raftGroupService, NacosStateMachine machine) {

View File

@ -16,8 +16,10 @@
package com.alibaba.nacos.core.distributed.raft; package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.nacos.consistency.entity.Response;
import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.google.protobuf.Message; import com.google.protobuf.Message;
/** /**
@ -25,76 +27,113 @@ import com.google.protobuf.Message;
*/ */
public class NacosClosure implements Closure { public class NacosClosure implements Closure {
private final Message log; private final Message message;
private final Closure closure; private final Closure closure;
private Throwable throwable; private final NacosStatus nacosStatus = new NacosStatus();
private Object object;
public NacosClosure(Message log, Closure closure) { public NacosClosure(Message message, Closure closure) {
this.log = log; this.message = message;
this.closure = closure; this.closure = closure;
} }
@Override @Override
public void run(Status status) { public void run(Status status) {
if (closure != null) { nacosStatus.setStatus(status);
NStatus status1 = new NStatus(status, throwable); closure.run(nacosStatus);
status1.setResult(object);
closure.run(status1);
}
} }
public Object getObject() { public void setResponse(Response response) {
return object; this.nacosStatus.setResponse(response);
}
public void setObject(Object object) {
this.object = object;
} }
public void setThrowable(Throwable throwable) { public void setThrowable(Throwable throwable) {
this.throwable = throwable; this.nacosStatus.setThrowable(throwable);
} }
public Closure getClosure() { public Message getMessage() {
return closure; return message;
}
public Message getLog() {
return log;
} }
// Pass the Throwable inside the state machine to the outer layer // Pass the Throwable inside the state machine to the outer layer
@SuppressWarnings("PMD.ClassNamingShouldBeCamelRule") @SuppressWarnings("PMD.ClassNamingShouldBeCamelRule")
public static class NStatus extends Status { public static class NacosStatus extends Status {
private Status status; private Status status;
private Object result; private Response response = null;
private Throwable throwable; private Throwable throwable = null;
public NStatus(Status status, Throwable throwable) {
super();
this.status = status;
this.throwable = throwable;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) { public void setStatus(Status status) {
this.status = status; this.status = status;
} }
public Object getResult() { @Override
return result; public void reset() {
status.reset();
} }
public void setResult(Object result) { @Override
this.result = result; public boolean isOk() {
return status.isOk();
}
@Override
public void setCode(int code) {
status.setCode(code);
}
@Override
public int getCode() {
return status.getCode();
}
@Override
public RaftError getRaftError() {
return status.getRaftError();
}
@Override
public void setErrorMsg(String errMsg) {
status.setErrorMsg(errMsg);
}
@Override
public void setError(int code, String fmt, Object... args) {
status.setError(code, fmt, args);
}
@Override
public void setError(RaftError error, String fmt, Object... args) {
status.setError(error, fmt, args);
}
@Override
public String toString() {
return status.toString();
}
@Override
public Status copy() {
NacosStatus copy = new NacosStatus();
copy.status = this.status;
copy.response = this.response;
copy.throwable = this.throwable;
return copy;
}
@Override
public String getErrorMsg() {
return status.getErrorMsg();
}
public Response getResponse() {
return response;
}
public void setResponse(Response response) {
this.response = response;
} }
public Throwable getThrowable() { public Throwable getThrowable() {

View File

@ -32,7 +32,6 @@ import com.alibaba.nacos.consistency.snapshot.Writer;
import com.alibaba.nacos.core.distributed.raft.utils.JRaftUtils; import com.alibaba.nacos.core.distributed.raft.utils.JRaftUtils;
import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Node;
@ -94,7 +93,7 @@ class NacosStateMachine extends StateMachineAdapter {
try { try {
if (iter.done() != null) { if (iter.done() != null) {
closure = (NacosClosure) iter.done(); closure = (NacosClosure) iter.done();
message = closure.getLog(); message = closure.getMessage();
} }
else { else {
final ByteBuffer data = iter.getData(); final ByteBuffer data = iter.getData();
@ -246,9 +245,9 @@ class NacosStateMachine extends StateMachineAdapter {
RouteTable.getInstance().getConfiguration(node.getGroupId()).getPeers()); RouteTable.getInstance().getConfiguration(node.getGroupId()).getPeers());
} }
private void postProcessor(Object data, NacosClosure closure) { private void postProcessor(Response data, NacosClosure closure) {
if (Objects.nonNull(closure)) { if (Objects.nonNull(closure)) {
closure.setObject(data); closure.setResponse(data);
} }
} }

View File

@ -19,7 +19,7 @@ package com.alibaba.nacos.core.distributed.raft.exception;
/** /**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/ */
public class NoSuchRaftGroupException extends Exception { public class NoSuchRaftGroupException extends RuntimeException {
private static final long serialVersionUID = 1755681688785678765L; private static final long serialVersionUID = 1755681688785678765L;

View File

@ -71,13 +71,13 @@ public abstract class AbstractProcessor {
} }
protected void execute(JRaftServer server, final RpcContext asyncCtx, final Message log, final JRaftServer.RaftGroupTuple tuple) { protected void execute(JRaftServer server, final RpcContext asyncCtx, final Message log, final JRaftServer.RaftGroupTuple tuple) {
FailoverClosure<Object> closure = new FailoverClosure<Object>() { FailoverClosure closure = new FailoverClosure() {
Object data; Response data;
Throwable ex; Throwable ex;
@Override @Override
public void setData(Object data) { public void setResponse(Response data) {
this.data = data; this.data = data;
} }
@ -93,14 +93,7 @@ public abstract class AbstractProcessor {
asyncCtx.sendResponse(Response.newBuilder().setErrMsg(ex.toString()) asyncCtx.sendResponse(Response.newBuilder().setErrMsg(ex.toString())
.setSuccess(false).build()); .setSuccess(false).build());
} else { } else {
ByteString bytes = Objects.nonNull(data) ? ByteString.copyFrom(serializer.serialize(data)) : ByteString.EMPTY; asyncCtx.sendResponse(data);
Response response = Response.newBuilder()
.setSuccess(true)
.setData(bytes)
.build();
asyncCtx.sendResponse(response);
} }
} }
}; };

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.distributed.raft.utils; package com.alibaba.nacos.core.distributed.raft.utils;
import com.alibaba.nacos.consistency.entity.Response;
import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Closure;
/** /**
@ -23,14 +24,14 @@ import com.alipay.sofa.jraft.Closure;
* *
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/ */
public interface FailoverClosure<T> extends Closure { public interface FailoverClosure extends Closure {
/** /**
* Set the return interface if needed * Set the return interface if needed
* *
* @param data data * @param response {@link Response} data
*/ */
void setData(T data); void setResponse(Response response);
/** /**
* Catch exception * Catch exception

View File

@ -16,11 +16,11 @@
package com.alibaba.nacos.core.distributed.raft.utils; package com.alibaba.nacos.core.distributed.raft.utils;
import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.Objects;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.exception.ConsistencyException; import com.alibaba.nacos.consistency.exception.ConsistencyException;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
@ -28,39 +28,36 @@ import java.util.concurrent.CompletableFuture;
* *
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/ */
public class FailoverClosureImpl<T> implements FailoverClosure<T> { public class FailoverClosureImpl implements FailoverClosure {
private final CompletableFuture<T> future; private final CompletableFuture<Response> future;
private volatile T data; private volatile Response data;
private volatile Throwable throwable; private volatile Throwable throwable;
public FailoverClosureImpl(final CompletableFuture<T> future) { public FailoverClosureImpl(final CompletableFuture<Response> future) {
this.future = future; this.future = future;
} }
@Override @Override
public void setData(T data) { public void setResponse(Response data) {
this.data = data; this.data = data;
} }
@Override @Override
public void setThrowable(Throwable throwable) { public void setThrowable(Throwable throwable) {
this.throwable = throwable; this.throwable = throwable;
} }
@Override @Override
public void run(Status status) { public void run(Status status) {
if (status.isOk()) { if (status.isOk()) {
boolean success = future.complete(data); future.complete(data);
LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "future.complete execute {}", success); return;
return; }
} final Throwable throwable = this.throwable;
final Throwable throwable = this.throwable; future.completeExceptionally(Objects.nonNull(throwable) ?
if (Objects.nonNull(throwable)) { new ConsistencyException(throwable.toString()) :
future.completeExceptionally(new ConsistencyException(throwable.toString())); new ConsistencyException("operation failure"));
} else { }
future.completeExceptionally(new ConsistencyException("operation failure"));
}
}
} }

View File

@ -0,0 +1,59 @@
package com.alibaba.nacos.core.distributed.raft.processor;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alibaba.nacos.core.distributed.raft.utils.FailoverClosure;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.Connection;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.google.protobuf.Message;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
public class AbstractProcessorTest {
private JRaftServer server = new JRaftServer() {
@Override
public void applyOperation(Node node, Message data, FailoverClosure closure) {
closure.setResponse(Response.newBuilder().setSuccess(false).setErrMsg("Error message transmission").build());
closure.run(new Status(RaftError.UNKNOWN, "Error message transmission"));
}
};
@Test
public void testErrorThroughRPC() {
final AtomicReference<Response> reference = new AtomicReference<>();
RpcContext context = new RpcContext() {
@Override
public void sendResponse(Object responseObj) {
reference.set((Response) responseObj);
}
@Override
public Connection getConnection() {
return null;
}
@Override
public String getRemoteAddress() {
return null;
}
};
AbstractProcessor processor = new NacosLogProcessor(server, SerializeFactory.getDefault());
processor.execute(server, context, Log.newBuilder().build(), new JRaftServer.RaftGroupTuple());
Response response = reference.get();
Assert.assertNotNull(response);
Assert.assertEquals("Error message transmission", response.getErrMsg());
Assert.assertFalse(response.getSuccess());
}
}