feat: jraft use grpc

This commit is contained in:
chuntaojun 2020-05-10 17:14:06 +08:00
parent 6a6725a269
commit fa5769381c
14 changed files with 66 additions and 358 deletions

1
.gitignore vendored
View File

@ -8,7 +8,6 @@ target
.DS_Store .DS_Store
.factorypath .factorypath
/logs /logs
/lib
*.iml *.iml
node_modules node_modules
test/derby.log test/derby.log

View File

@ -135,9 +135,6 @@ nacos.istio.mcp.server.enabled=false
# nacos.core.member.lookup.type=[file,address-server,discovery] # nacos.core.member.lookup.type=[file,address-server,discovery]
## Set the cluster list with a configuration file or command-line argument ## Set the cluster list with a configuration file or command-line argument
# nacos.member.list=192.168.16.101:8847?raft_port=8807,192.168.16.101?raft_port=8808,192.168.16.101:8849?raft_port=8809 # nacos.member.list=192.168.16.101:8847?raft_port=8807,192.168.16.101?raft_port=8808,192.168.16.101:8849?raft_port=8809
## for DiscoveryMemberLookup
# If you want to use cluster node self-discovery, turn this parameter on
# nacos.member.discovery=false
## for AddressServerMemberLookup ## for AddressServerMemberLookup
# Maximum number of retries to query the address server upon initialization # Maximum number of retries to query the address server upon initialization
# nacos.core.address-server.retry=5 # nacos.core.address-server.retry=5

View File

@ -120,6 +120,13 @@
<dependency> <dependency>
<groupId>com.alipay.sofa</groupId> <groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId> <artifactId>jraft-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>rpc-grpc-impl</artifactId>
<scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -159,6 +166,5 @@
<groupId>org.reflections</groupId> <groupId>org.reflections</groupId>
<artifactId>reflections</artifactId> <artifactId>reflections</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,207 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.cluster.lookup;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.HttpClientManager;
import com.alibaba.nacos.common.http.NAsyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.Task;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Commons;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* <pre>
*
*
* Member A
* [ip1.port,ip2.port]
*
* DiscoveryMemberLookup
*
*
*
*
*
*
* read init members from
* cluster.conf or
*
*
*
*
*
* Member B
* init discovery task MemberListSyncTask [ip1:port,ip2:port,ip3:port] [ip1:port,ip2.port,ip3.port]
* {adweight:"",site:"",state:""}
*
*
*
* </pre>
*
* <ul>
* <li>{@link MemberListSyncTask} : Cluster node list synchronization tasks</li>
* </ul>
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class DiscoveryMemberLookup extends AbstractMemberLookup {
NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient();
MemberListSyncTask syncTask;
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = ApplicationUtils.readClusterConf();
tmpMembers.addAll(MemberUtils.readServerConf(tmp));
}
catch (Throwable ex) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
afterLookup(tmpMembers);
// Whether to enable the node self-discovery function that comes with nacos
// The reason why instance properties are not used here is so that
// the hot update mechanism can be implemented later
syncTask = new MemberListSyncTask();
GlobalExecutor.scheduleByCommon(syncTask, 5_000L);
}
}
@Override
public void destroy() {
syncTask.shutdown();
}
// Synchronize cluster member list information to a node
class MemberListSyncTask extends Task {
private final GenericType<RestResult<Collection<String>>> reference = new GenericType<RestResult<Collection<String>>>() {
};
@Override
public void executeBody() {
TimerContext.start("MemberListSyncTask");
try {
Collection<Member> kMembers = MemberUtils.kRandom(memberManager.allMembers(), member -> {
// local node or node check failed will not perform task processing
if (!member.check()) {
return false;
}
NodeState state = member.getState();
return !(state == NodeState.DOWN || state == NodeState.SUSPICIOUS);
});
for (Member member : kMembers) {
// If the cluster self-discovery is turned on, the information is synchronized with the node
String url = "http://" + member.getAddress() + ApplicationUtils
.getContextPath() + Commons.NACOS_CORE_CONTEXT
+ "/cluster/simple/nodes";
if (shutdown) {
return;
}
asyncHttpClient.get(url, Header.EMPTY, Query.EMPTY, reference.getType(), new Callback<Collection<String>>() {
@Override
public void onReceive(RestResult<Collection<String>> result) {
if (result.ok()) {
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "success ping to node : {}, result : {}",
member, result);
final Collection<String> data = result.getData();
if (CollectionUtils.isNotEmpty(data)) {
discovery(data);
}
MemberUtils.onSuccess(member);
}
else {
Loggers.CLUSTER
.warn("An exception occurred while reporting their "
+ "information to the node : {}, error : {}",
member.getAddress(),
result.getMessage());
MemberUtils.onFail(member);
}
}
@Override
public void onError(Throwable e) {
Loggers.CLUSTER
.error("An exception occurred while reporting their "
+ "information to the node : {}, error : {}",
member.getAddress(), e.getMessage());
MemberUtils.onFail(member, e);
}
});
}
}
catch (Exception e) {
Loggers.CLUSTER.error("node state report task has error : {}", e.getMessage());
}
finally {
TimerContext.end(Loggers.CLUSTER);
}
}
@Override
protected void after() {
GlobalExecutor.scheduleByCommon(this, 5_000L);
}
private void discovery(Collection<String> result) {
try {
Set<Member> tmp = new HashSet<>(memberManager.allMembers());
tmp.addAll(MemberUtils.readServerConf(Objects.requireNonNull(result)));
afterLookup(tmp);
}
catch (Exception e) {
Loggers.CLUSTER.error("The cluster self-detects a problem");
}
}
}
}

View File

@ -32,8 +32,6 @@ import java.util.Objects;
*/ */
public final class LookupFactory { public final class LookupFactory {
static final String DISCOVERY_SWITCH_NAME = "nacos.member.discovery";
static final String LOOKUP_MODE_TYPE = "nacos.core.member.lookup.type"; static final String LOOKUP_MODE_TYPE = "nacos.core.member.lookup.type";
static MemberLookup LOOK_UP = null; static MemberLookup LOOK_UP = null;
@ -52,10 +50,7 @@ public final class LookupFactory {
*/ */
ADDRESS_SERVER(2, "address-server"), ADDRESS_SERVER(2, "address-server"),
/** ;
* Self discovery addressing pattern
*/
DISCOVERY(3, "discovery");
private final int code; private final int code;
private final String name; private final String name;
@ -139,10 +134,6 @@ public final class LookupFactory {
LOOK_UP = new AddressServerMemberLookup(); LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP; return LOOK_UP;
} }
if (LookupType.DISCOVERY.equals(type)) {
LOOK_UP = new DiscoveryMemberLookup();
return LOOK_UP;
}
// unpossible to run here // unpossible to run here
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
@ -155,10 +146,7 @@ public final class LookupFactory {
} }
} }
File file = new File(ApplicationUtils.getClusterConfFilePath()); File file = new File(ApplicationUtils.getClusterConfFilePath());
if (ApplicationUtils
.getProperty(DISCOVERY_SWITCH_NAME, Boolean.class, false)) {
return LookupType.DISCOVERY;
}
if (file.exists() || StringUtils.isNotBlank(ApplicationUtils.getMemberList())) { if (file.exists() || StringUtils.isNotBlank(ApplicationUtils.getMemberList())) {
return LookupType.FILE_CONFIG; return LookupType.FILE_CONFIG;
} }

View File

@ -18,8 +18,8 @@ package com.alibaba.nacos.core.controller;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.utils.Commons; import com.alibaba.nacos.core.utils.Commons;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -34,8 +34,11 @@ import java.util.Map;
@RequestMapping(Commons.NACOS_CORE_CONTEXT + "/ops") @RequestMapping(Commons.NACOS_CORE_CONTEXT + "/ops")
public class CoreOpsController { public class CoreOpsController {
@Autowired private final ProtocolManager protocolManager;
private CPProtocol cpProtocol;
public CoreOpsController(ProtocolManager protocolManager) {
this.protocolManager = protocolManager;
}
// Temporarily overpassed the raft operations interface // Temporarily overpassed the raft operations interface
// { // {
@ -47,7 +50,8 @@ public class CoreOpsController {
@PostMapping(value = "/raft") @PostMapping(value = "/raft")
public RestResult<String> raftOps(@RequestBody Map<String, String> commands) { public RestResult<String> raftOps(@RequestBody Map<String, String> commands) {
return cpProtocol.execute(commands); CPProtocol protocol = protocolManager.getCpProtocol();
return protocol.execute(commands);
} }
} }

View File

@ -19,7 +19,6 @@ package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ByteUtils; import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.LogProcessor; import com.alibaba.nacos.consistency.LogProcessor;
@ -47,9 +46,6 @@ import com.alibaba.nacos.core.distributed.raft.utils.RetryRunner;
import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.rpc.RpcServer;
import com.alipay.remoting.rpc.protocol.AsyncUserProcessor;
import com.alipay.sofa.jraft.CliService; import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.RaftGroupService;
@ -61,19 +57,21 @@ import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService; import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Slf4jReporter; import com.codahale.metrics.Slf4jReporter;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection; import java.util.Collection;
@ -83,7 +81,6 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -113,7 +110,7 @@ public class JRaftServer {
// Existential life cycle // Existential life cycle
private RpcServer rpcServer; private RpcServer rpcServer;
private BoltCliClientService cliClientService; private CliClientServiceImpl cliClientService;
private CliService cliService; private CliService cliService;
private Map<String, RaftGroupTuple> multiRaftGroup = new ConcurrentHashMap<>(); private Map<String, RaftGroupTuple> multiRaftGroup = new ConcurrentHashMap<>();
@ -123,7 +120,7 @@ public class JRaftServer {
private volatile boolean isShutdown = false; private volatile boolean isShutdown = false;
private Configuration conf; private Configuration conf;
private AsyncUserProcessor userProcessor; private RpcProcessor userProcessor;
private NodeOptions nodeOptions; private NodeOptions nodeOptions;
private Serializer serializer; private Serializer serializer;
private Collection<LogProcessor4CP> processors = Collections private Collection<LogProcessor4CP> processors = Collections
@ -175,7 +172,7 @@ public class JRaftServer {
CliOptions cliOptions = new CliOptions(); CliOptions cliOptions = new CliOptions();
this.cliClientService = new BoltCliClientService(); this.cliClientService = new CliClientServiceImpl();
this.cliClientService.init(cliOptions); this.cliClientService.init(cliOptions);
this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions); this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
} }
@ -197,14 +194,12 @@ public class JRaftServer {
} }
nodeOptions.setInitialConf(conf); nodeOptions.setInitialConf(conf);
rpcServer = new RpcServer(selfPort, true, false); rpcServer = RaftRpcServerFactory.createRaftRpcServer(new Endpoint(localPeerId.getIp(), localPeerId.getPort()),
JRaftUtils.addRaftRequestProcessors(rpcServer,
RaftExecutor.getRaftCoreExecutor(), RaftExecutor.getRaftCoreExecutor(),
RaftExecutor.getRaftCliServiceExecutor()); RaftExecutor.getRaftCliServiceExecutor());
rpcServer.registerUserProcessor( rpcServer.registerProcessor(new NacosAsyncProcessor(this, failoverRetries));
new NacosAsyncProcessor(this, failoverRetries));
if (!this.rpcServer.start()) { if (!this.rpcServer.init(null)) {
Loggers.RAFT.error("Fail to init [RpcServer]."); Loggers.RAFT.error("Fail to init [RpcServer].");
throw new RuntimeException("Fail to init [RpcServer]."); throw new RuntimeException("Fail to init [RpcServer].");
} }
@ -477,7 +472,6 @@ public class JRaftServer {
cliService.shutdown(); cliService.shutdown();
cliClientService.shutdown(); cliClientService.shutdown();
rpcServer.stop();
Loggers.RAFT.info("========= The raft protocol has been closed ========="); Loggers.RAFT.info("========= The raft protocol has been closed =========");
} }
@ -507,21 +501,26 @@ public class JRaftServer {
private void invokeToLeader(final String group, final Log request, private void invokeToLeader(final String group, final Log request,
final int timeoutMillis, FailoverClosure closure) { final int timeoutMillis, FailoverClosure closure) {
try { try {
final String leaderIp = Optional.ofNullable(getLeader(group)) final Endpoint leaderIp = Optional.ofNullable(getLeader(group))
.orElseThrow(() -> new NoLeaderException(group)).getEndpoint() .orElseThrow(() -> new NoLeaderException(group)).getEndpoint();
.toString();
final BytesHolder holder = BytesHolder.create(request.toByteArray()); final BytesHolder holder = BytesHolder.create(request.toByteArray());
cliClientService.getRpcClient() cliClientService.getRpcClient()
.invokeWithCallback(leaderIp, holder, new InvokeCallback() { .invokeAsync(leaderIp, holder, new InvokeCallback() {
@Override @Override
public void onResponse(Object o) { public void complete(Object o, Throwable ex) {
if (Objects.nonNull(ex)) {
closure.setThrowable(ex);
closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
}
RestResult result = (RestResult) o; RestResult result = (RestResult) o;
if (result.ok()) { if (result.ok()) {
closure.setData(result.getData()); closure.setData(result.getData());
closure.run(Status.OK()); closure.run(Status.OK());
} }
else { else {
Throwable ex = (Throwable) result.getData(); ex = (Throwable) result.getData();
closure.setThrowable(ex); closure.setThrowable(ex);
closure.run( closure.run(
new Status(RaftError.UNKNOWN, ex.getMessage())); new Status(RaftError.UNKNOWN, ex.getMessage()));
@ -529,13 +528,7 @@ public class JRaftServer {
} }
@Override @Override
public void onException(Throwable e) { public Executor executor() {
closure.setThrowable(e);
closure.run(new Status(RaftError.UNKNOWN, e.getMessage()));
}
@Override
public Executor getExecutor() {
return RaftExecutor.getRaftCliServiceExecutor(); return RaftExecutor.getRaftCliServiceExecutor();
} }
}, timeoutMillis); }, timeoutMillis);
@ -605,10 +598,6 @@ public class JRaftServer {
return multiRaftGroup; return multiRaftGroup;
} }
BoltCliClientService getCliClientService() {
return cliClientService;
}
CliService getCliService() { CliService getCliService() {
return cliService; return cliService;
} }

View File

@ -40,7 +40,6 @@ import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RouteTable; import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter; import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext; import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter;

View File

@ -28,9 +28,8 @@ import com.alibaba.nacos.core.distributed.raft.RaftSysConstants;
import com.alibaba.nacos.core.distributed.raft.exception.NoLeaderException; import com.alibaba.nacos.core.distributed.raft.exception.NoLeaderException;
import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException; import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException;
import com.alibaba.nacos.core.distributed.raft.utils.BytesHolder; import com.alibaba.nacos.core.distributed.raft.utils.BytesHolder;
import com.alipay.remoting.AsyncContext; import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.remoting.BizContext; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.remoting.rpc.protocol.AsyncUserProcessor;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -38,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
/** /**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/ */
public class NacosAsyncProcessor extends AsyncUserProcessor<BytesHolder> { public class NacosAsyncProcessor implements RpcProcessor<BytesHolder> {
private static final String INTEREST_NAME = BytesHolder.class.getName(); private static final String INTEREST_NAME = BytesHolder.class.getName();
@ -51,12 +50,12 @@ public class NacosAsyncProcessor extends AsyncUserProcessor<BytesHolder> {
} }
@Override @Override
public void handleRequest(BizContext bizContext, AsyncContext asyncCtx, BytesHolder holder) { public void handleRequest(final RpcContext rpcCtx, BytesHolder holder) {
try { try {
Log log = Log.parseFrom(holder.getBytes()); Log log = Log.parseFrom(holder.getBytes());
final JRaftServer.RaftGroupTuple tuple = server.findTupleByGroup(log.getGroup()); final JRaftServer.RaftGroupTuple tuple = server.findTupleByGroup(log.getGroup());
if (Objects.isNull(tuple)) { if (Objects.isNull(tuple)) {
asyncCtx.sendResponse(RestResultUtils.failedWithException(new NoSuchRaftGroupException( rpcCtx.sendResponse(RestResultUtils.failedWithException(new NoSuchRaftGroupException(
"Could not find the corresponding Raft Group : " + log.getGroup()))); "Could not find the corresponding Raft Group : " + log.getGroup())));
return; return;
} }
@ -66,7 +65,7 @@ public class NacosAsyncProcessor extends AsyncUserProcessor<BytesHolder> {
CompletableFuture<Object> future = new CompletableFuture<>(); CompletableFuture<Object> future = new CompletableFuture<>();
server.commit(log, future, retryCnt).whenComplete((result, t) -> { server.commit(log, future, retryCnt).whenComplete((result, t) -> {
if (Objects.nonNull(t)) { if (Objects.nonNull(t)) {
asyncCtx.sendResponse(RestResultUtils.failedWithException(t)); rpcCtx.sendResponse(RestResultUtils.failedWithException(t));
return; return;
} }
if (result instanceof LogFuture) { if (result instanceof LogFuture) {
@ -77,7 +76,7 @@ public class NacosAsyncProcessor extends AsyncUserProcessor<BytesHolder> {
} else { } else {
r = RestResultUtils.success(f.getError()); r = RestResultUtils.success(f.getError());
} }
asyncCtx.sendResponse(r); rpcCtx.sendResponse(r);
return; return;
} }
if (result instanceof GetResponse) { if (result instanceof GetResponse) {
@ -88,15 +87,15 @@ public class NacosAsyncProcessor extends AsyncUserProcessor<BytesHolder> {
} else { } else {
r = RestResultUtils.success(response.toByteArray()); r = RestResultUtils.success(response.toByteArray());
} }
asyncCtx.sendResponse(r); rpcCtx.sendResponse(r);
} }
}); });
} }
else { else {
asyncCtx.sendResponse(RestResultUtils.failedWithException(new NoLeaderException(log.getGroup()))); rpcCtx.sendResponse(RestResultUtils.failedWithException(new NoLeaderException(log.getGroup())));
} }
} catch (Exception e) { } catch (Exception e) {
asyncCtx.sendResponse(RestResultUtils.failedWithException(e)); rpcCtx.sendResponse(RestResultUtils.failedWithException(e));
} }
} }

View File

@ -19,33 +19,12 @@ package com.alibaba.nacos.core.distributed.raft.utils;
import com.alibaba.nacos.common.utils.DiskUtils; import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.consistency.entity.Log; import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.rpc.RpcServer;
import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.AddLearnersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.AddPeerRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.ChangePeersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.GetLeaderRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.GetPeersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.RemoveLearnersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.RemovePeerRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.ResetLearnersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.ResetPeerRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.SnapshotRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.TransferLeaderRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.GetFileRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.InstallSnapshotRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.ReadIndexRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.RequestVoteRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
import java.io.File; import java.io.File;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -89,31 +68,4 @@ public class JRaftUtils {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor) {
// raft core processors
final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor(
raftExecutor);
rpcServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, appendEntriesRequestProcessor);
rpcServer.registerUserProcessor(appendEntriesRequestProcessor);
rpcServer.registerUserProcessor(new GetFileRequestProcessor(raftExecutor));
rpcServer.registerUserProcessor(new InstallSnapshotRequestProcessor(raftExecutor));
rpcServer.registerUserProcessor(new RequestVoteRequestProcessor(raftExecutor));
rpcServer.registerUserProcessor(new PingRequestProcessor());
rpcServer.registerUserProcessor(new TimeoutNowRequestProcessor(raftExecutor));
rpcServer.registerUserProcessor(new ReadIndexRequestProcessor(raftExecutor));
// raft cli service
rpcServer.registerUserProcessor(new AddPeerRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new RemovePeerRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new ResetPeerRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new ChangePeersRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new GetLeaderRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new SnapshotRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new TransferLeaderRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new GetPeersRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new AddLearnersRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new RemoveLearnersRequestProcessor(cliExecutor));
rpcServer.registerUserProcessor(new ResetLearnersRequestProcessor(cliExecutor));
}
} }

Binary file not shown.

Binary file not shown.

16
pom.xml
View File

@ -582,8 +582,14 @@
<dependency> <dependency>
<groupId>com.alipay.sofa</groupId> <groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId> <artifactId>jraft-core</artifactId>
<version>1.3.0</version> <version>1.3.2.beta1</version>
<scope>system</scope>
<systemPath>${basedir}/libs/jraft-core-1.3.2.beta1.jar</systemPath>
<exclusions> <exclusions>
<exclusion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId> <artifactId>log4j-api</artifactId>
@ -603,6 +609,14 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>rpc-grpc-impl</artifactId>
<version>1.3.2.beta1</version>
<scope>system</scope>
<systemPath>${basedir}/libs/rpc-grpc-impl-1.3.2.beta1.jar</systemPath>
</dependency>
<!-- hessian --> <!-- hessian -->
<dependency> <dependency>

View File

@ -18,12 +18,9 @@ package com.alibaba.nacos.test.core.cluster;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.DiskUtils; import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.cluster.lookup.AddressServerMemberLookup; import com.alibaba.nacos.core.cluster.lookup.AddressServerMemberLookup;
import com.alibaba.nacos.core.cluster.lookup.DiscoveryMemberLookup;
import com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup; import com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup;
import com.alibaba.nacos.core.cluster.lookup.LookupFactory; import com.alibaba.nacos.core.cluster.lookup.LookupFactory;
import com.alibaba.nacos.core.cluster.MemberLookup; import com.alibaba.nacos.core.cluster.MemberLookup;
@ -42,20 +39,10 @@ import org.springframework.core.env.StandardEnvironment;
import org.springframework.mock.web.MockServletContext; import org.springframework.mock.web.MockServletContext;
import java.io.File; import java.io.File;
import java.io.StringReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
@ -154,25 +141,6 @@ public class MemberLookup_ITCase extends BaseTest {
} }
} }
@Test
public void test_d_lookup_discovery() throws Exception {
ApplicationUtils.setIsStandalone(false);
System.setProperty("nacos.member.discovery", "true");
System.out.println(ApplicationUtils.getClusterConfFilePath());
System.out.println(new File(ApplicationUtils.getClusterConfFilePath()).exists());
try {
LookupFactory.createLookUp(memberManager);
}
catch (Throwable ignore) {
}
System.setProperty("nacos.member.discovery", "false");
MemberLookup lookup = LookupFactory.getLookUp();
System.out.println(lookup);
Assert.assertTrue(lookup instanceof DiscoveryMemberLookup);
func(lookup);
}
private void func(MemberLookup lookup) throws Exception { private void func(MemberLookup lookup) throws Exception {
func(lookup, 3); func(lookup, 3);
} }