fix: fix expansion issues

This commit is contained in:
chuntaojun 2020-05-09 20:32:55 +08:00
parent 5879b03c7d
commit 6a6725a269
10 changed files with 177 additions and 336232 deletions

View File

@ -59,6 +59,7 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.security.AccessControlException;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
/**
@ -93,6 +94,7 @@ public class TransferToLeaderFilter implements Filter {
protected void init() {
LogUtil.defaultLog.info("Open the request and forward it to the leader");
listenerLeaderStatus();
listenerSelfInCluster();
registerSubscribe();
}
@ -214,7 +216,6 @@ public class TransferToLeaderFilter implements Filter {
new Observer() {
@Override
public void update(Observable o, Object arg) {
openService = true;
final String raftLeader = String.valueOf(arg);
boolean found = false;
for (Map.Entry<String, Member> entry : memberManager
@ -235,6 +236,24 @@ public class TransferToLeaderFilter implements Filter {
});
}
private void listenerSelfInCluster() {
protocol.protocolMetaData().subscribe(Constants.CONFIG_MODEL_RAFT_GROUP,
com.alibaba.nacos.consistency.cp.Constants.RAFT_GROUP_MEMBER,
new Observer() {
@Override
public void update(Observable o, Object arg) {
final List<String> peers = (List<String>) arg;
final Member self = memberManager.getSelf();
final String raftAddress = self.getIp() + ":" + self
.getExtendVal(MemberMetaDataConstants.RAFT_PORT);
// Only when you are in the cluster and the current Leader is
// elected can you provide external services
openService = peers.contains(raftAddress)
&& StringUtils.isNotBlank(leaderServer);
}
});
}
private void registerSubscribe() {
NotifyCenter.registerSubscribe(new SmartSubscribe() {

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.consistency;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.consistency.entity.GetRequest;
import com.alibaba.nacos.consistency.entity.GetResponse;
import com.alibaba.nacos.consistency.entity.Log;

View File

@ -114,9 +114,7 @@ public class StartingSpringApplicationRunListener
ConfigurableEnvironment env = context.getEnvironment();
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
closeExecutor();
logFilePath();
@ -160,6 +158,8 @@ public class StartingSpringApplicationRunListener
ThreadPoolManager.shutdown();
WatchFileCenter.shutdown();
NotifyCenter.shutdown();
closeExecutor();
}
/**
@ -199,6 +199,12 @@ public class StartingSpringApplicationRunListener
}
}
private void closeExecutor() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
}
private void logStarting() {
if (!ApplicationUtils.getStandaloneMode()) {

View File

@ -18,7 +18,14 @@ package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -27,6 +34,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -50,11 +58,13 @@ public class EnlargeShrinksCapacity {
private final JRaftServer server;
private FutureTask<Boolean> task;
private Selector selector;
private final CliService cliService;
private AtomicBoolean isFirst = new AtomicBoolean(true);
public EnlargeShrinksCapacity(JRaftServer server) throws IOException {
this.server = server;
this.cliService = server.getCliService();
this.selector = Selector.open();
}
@ -105,7 +115,7 @@ public class EnlargeShrinksCapacity {
LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "The probe port is accessible to the node : {}", alreadyConnect);
if (!alreadyConnect.isEmpty()) {
server.peersChange(alreadyConnect);
peersChange(alreadyConnect);
}
ThreadUtils.sleep(1000L);
@ -116,6 +126,52 @@ public class EnlargeShrinksCapacity {
});
}
void peersChange(Set<String> addresses) {
for (Map.Entry<String, JRaftServer.RaftGroupTuple> entry : server.getMultiRaftGroup().entrySet()) {
final String groupId = entry.getKey();
final Node node = entry.getValue().getNode();
if (!node.isLeader()) {
return;
}
final Configuration oldConf = RouteTable.getInstance()
.getConfiguration(groupId);
final Configuration newConf = new Configuration();
for (String address : addresses) {
newConf.addPeer(PeerId.parsePeer(address));
}
if (Objects.equals(oldConf, newConf)) {
return;
}
for (int i = 0; i < 3; i++) {
try {
Status status = cliService.changePeers(groupId, oldConf, newConf);
if (status.isOk()) {
Loggers.RAFT
.info("Node update success, groupId : {}, oldConf : {}, newConf : {}, status : {}, Try again the {} time",
groupId, oldConf, newConf, status, i + 1);
RaftExecutor.executeByCommon(() -> server.refreshRouteTable(groupId));
return;
}
else {
Loggers.RAFT
.error("Nodes update failed, groupId : {}, oldConf : {}, newConf : {}, status : {}, Try again the {} time",
groupId, oldConf, newConf, status, i + 1);
ThreadUtils.sleep(500L);
}
}
catch (Exception e) {
Loggers.RAFT
.error("An exception occurred during the node change operation : {}",
e);
}
}
}
}
private Set<String> onConnect() {
Set<String> connectPeer = new HashSet<>();
try {
@ -171,5 +227,4 @@ public class EnlargeShrinksCapacity {
}
}
}

View File

@ -104,7 +104,6 @@ public class JRaftProtocol
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean shutdowned = new AtomicBoolean(false);
private final EnlargeShrinksCapacity enlargeShrinksCapacity;
private RaftConfig raftConfig;
private JRaftServer raftServer;
private JRaftOps jRaftOps;
@ -118,7 +117,6 @@ public class JRaftProtocol
this.memberManager = memberManager;
this.raftServer = new JRaftServer(failoverRetries);
this.jRaftOps = new JRaftOps(raftServer);
this.enlargeShrinksCapacity = new EnlargeShrinksCapacity(raftServer);
}
@Override
@ -224,7 +222,6 @@ public class JRaftProtocol
@Override
public void memberChange(Set<String> addresses) {
this.raftConfig.setMembers(raftConfig.getSelfMember(), addresses);
enlargeShrinksCapacity.execute(this.raftConfig.getMembers());
}
@Override

View File

@ -61,6 +61,7 @@ import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
@ -69,6 +70,7 @@ import com.alipay.sofa.jraft.util.BytesUtil;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Slf4jReporter;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils;
import java.io.File;
@ -77,7 +79,6 @@ import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -236,34 +237,34 @@ public class JRaftServer {
if (multiRaftGroup.containsKey(groupName)) {
throw new DuplicateRaftGroupException(groupName);
}
final String logUri = Paths.get(parentPath, groupName, "log").toString();
final String snapshotUri = Paths.get(parentPath, groupName, "snapshot")
.toString();
final String metaDataUri = Paths.get(parentPath, groupName, "meta-data")
.toString();
// Initialize the raft file storage path for different services
try {
DiskUtils.forceMkdir(new File(logUri));
DiskUtils.forceMkdir(new File(snapshotUri));
DiskUtils.forceMkdir(new File(metaDataUri));
}
catch (Exception e) {
Loggers.RAFT.error("Init Raft-File dir have some error : {}", e);
throw new RuntimeException(e);
}
// Ensure that each Raft Group has its own configuration and NodeOptions
Configuration configuration = conf.copy();
NodeOptions copy = nodeOptions.copy();
JRaftUtils.initDirectory(parentPath, groupName, copy);
if (!registerSelfToCluster(groupName, localPeerId, configuration)) {
// If the registration fails, you need to remove yourself first and then
// turn on the repeat registration logic
configuration.removePeer(localPeerId);
RaftExecutor.executeByCommon(() -> {
Configuration c = configuration.copy();
c.addPeer(localPeerId);
for ( ; ; ) {
if (registerSelfToCluster(groupName, localPeerId, c)) {
break;
}
ThreadUtils.sleep(1000L);
}
});
}
// Here, the LogProcessor is passed into StateMachine, and when the StateMachine
// triggers onApply, the onApply of the LogProcessor is actually called
NacosStateMachine machine = new NacosStateMachine(this, processor);
copy.setLogUri(logUri);
copy.setRaftMetaUri(metaDataUri);
copy.setSnapshotUri(snapshotUri);
copy.setFsm(machine);
copy.setInitialConf(configuration);
@ -399,45 +400,33 @@ public class JRaftServer {
return future;
}
void peersChange(Set<String> addresses) {
for (Map.Entry<String, RaftGroupTuple> entry : multiRaftGroup.entrySet()) {
final String groupId = entry.getKey();
final Node node = entry.getValue().node;
final Configuration oldConf = RouteTable.getInstance()
.getConfiguration(groupId);
final Configuration newConf = new Configuration();
for (String address : addresses) {
newConf.addPeer(PeerId.parsePeer(address));
boolean registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
PeerId leader = new PeerId();
for (int i = 0; i < 5; i ++) {
Status status = cliService.getLeader(groupId, conf, leader);
if (status.isOk()) {
break;
}
Loggers.RAFT.warn("get leader failed : {}", status);
}
if (Objects.equals(oldConf, newConf)) {
return;
}
// This means that this is a new cluster, following the normal initialization logic
if (leader.isEmpty()) {
return true;
}
for (int i = 0; i < 3; i++) {
try {
Status status = cliService.changePeers(groupId, oldConf, newConf);
if (status.isOk()) {
Loggers.RAFT
.info("Node update success, groupId : {}, oldConf : {}, newConf : {}, status : {}, Try again the {} time",
groupId, oldConf, newConf, status, i + 1);
RaftExecutor.executeByCommon(() -> refreshRouteTable(groupId));
return;
}
else {
Loggers.RAFT
.error("Nodes update failed, groupId : {}, oldConf : {}, newConf : {}, status : {}, Try again the {} time",
groupId, oldConf, newConf, status, i + 1);
ThreadUtils.sleep(500L);
}
}
catch (Exception e) {
Loggers.RAFT
.error("An exception occurred during the node change operation : {}",
e);
}
for (int i = 0; i < 5; i ++) {
Status status = cliService.addPeer(groupId, conf, selfIp);
if (status.isOk()) {
Loggers.RAFT.info("reigister self to cluster success");
return true;
} else {
Loggers.RAFT.error("register self to cluster has error : {}", status);
ThreadUtils.sleep(1000L);
}
}
return false;
}
protected PeerId getLeader(final String raftGroupId) {
@ -469,8 +458,16 @@ public class JRaftServer {
Loggers.RAFT
.info("========= The raft protocol is starting to close =========");
RouteTable instance = RouteTable.getInstance();
for (Map.Entry<String, RaftGroupTuple> entry : multiRaftGroup.entrySet()) {
final RaftGroupTuple tuple = entry.getValue();
final String groupId = entry.getKey();
final Configuration conf = instance.getConfiguration(groupId);
cliService.removePeer(groupId, conf, localPeerId);
tuple.node.shutdown();
tuple.raftGroupService.shutdown();
if (tuple.regionMetricsReporter != null) {
@ -549,8 +546,7 @@ public class JRaftServer {
}
}
private void refreshRouteTable(String group) {
void refreshRouteTable(String group) {
if (isShutdown) {
return;
}
@ -560,7 +556,7 @@ public class JRaftServer {
try {
RouteTable instance = RouteTable.getInstance();
Configuration oldConf = instance.getConfiguration(groupName);
String oldLeader = instance.selectLeader(groupName).getEndpoint().toString();
String oldLeader = Optional.ofNullable(instance.selectLeader(groupName)).orElse(PeerId.emptyPeer()).getEndpoint().toString();
status = instance.refreshConfiguration(this.cliClientService, groupName,
rpcRequestTimeoutMs);

View File

@ -16,10 +16,13 @@
package com.alibaba.nacos.core.distributed.raft.utils;
import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.rpc.RpcServer;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.AddLearnersRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.cli.AddPeerRequestProcessor;
@ -39,6 +42,8 @@ import com.alipay.sofa.jraft.rpc.impl.core.ReadIndexRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.RequestVoteRequestProcessor;
import com.alipay.sofa.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
import java.io.File;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@ -49,6 +54,29 @@ import java.util.stream.Collectors;
@SuppressWarnings("all")
public class JRaftUtils {
public static final void initDirectory(String parentPath, String groupName, NodeOptions copy) {
final String logUri = Paths.get(parentPath, groupName, "log").toString();
final String snapshotUri = Paths.get(parentPath, groupName, "snapshot")
.toString();
final String metaDataUri = Paths.get(parentPath, groupName, "meta-data")
.toString();
// Initialize the raft file storage path for different services
try {
DiskUtils.forceMkdir(new File(logUri));
DiskUtils.forceMkdir(new File(snapshotUri));
DiskUtils.forceMkdir(new File(metaDataUri));
}
catch (Exception e) {
Loggers.RAFT.error("Init Raft-File dir have some error : {}", e);
throw new RuntimeException(e);
}
copy.setLogUri(logUri);
copy.setRaftMetaUri(metaDataUri);
copy.setSnapshotUri(snapshotUri);
}
public static final Log injectExtendInfo(Log log, final String operate) {
Log gLog = Log.newBuilder(log)
.putExtendInfo(JRaftConstants.JRAFT_EXTEND_INFO_KEY, operate)

File diff suppressed because it is too large Load Diff

View File

@ -37,8 +37,6 @@ public class InetUtils_ITCase {
static {
System.setProperty("nacos.core.inet.auto-refresh", "3");
// Triggers a static method block call
InetUtils.getSelfIp();
}
@Test

View File

@ -1,74 +1,12 @@
#*************** Spring Boot Related Configurations ***************#
### Default web context path:
server.servlet.contextPath=/nacos
### Default web server port:
# server.port=8848
#*************** Network Related Configurations ***************#
### If prefer hostname over ip for Nacos server addresses in cluster.conf:
# nacos.inetutils.prefer-hostname-over-ip=false
### Specify local server's IP:
# nacos.inetutils.ip-address=
# spring
server.port=8848
nacos.standalone=true
#*************** Config Module Related Configurations ***************#
### If user MySQL as datasource:
# spring.datasource.platform=mysql
### Count of DB:
# db.num=1
### Connect URL of DB:
# db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
# db.user=root
# db.password=1017
#*************** Naming Module Related Configurations ***************#
### Data dispatch task execution period in milliseconds:
# nacos.naming.distro.taskDispatchPeriod=200
### Data count of batch sync task:
# nacos.naming.distro.batchSyncKeyCount=1000
### Retry delay in milliseconds if sync task failed:
# nacos.naming.distro.syncRetryDelay=5000
### If enable data warmup. If set to false, the server would accept request without local data preparation:
# nacos.naming.data.warmup=true
### If enable the instance auto expiration, kind like of health check of instance:
# nacos.naming.expireInstance=true
nacos.naming.empty-service.auto-clean=true
nacos.naming.empty-service.clean.initial-delay-ms=50000
nacos.naming.empty-service.clean.period-time-ms=30000
#*************** CMDB Module Related Configurations ***************#
### The interval to dump external CMDB in seconds:
# nacos.cmdb.dumpTaskInterval=3600
### The interval of polling data change event in seconds:
# nacos.cmdb.eventTaskInterval=10
### The interval of loading labels in seconds:
# nacos.cmdb.labelTaskInterval=300
### If turn on data loading task:
# nacos.cmdb.loadDataAtStart=false
#*************** Metrics Related Configurations ***************#
### Metrics for prometheus
#management.endpoints.web.exposure.include=*
### Metrics for elastic search
management.metrics.export.elastic.enabled=false
#management.metrics.export.elastic.host=http://localhost:9200
### Metrics for influx
# metrics for influx
management.metrics.export.influx.enabled=false
#management.metrics.export.influx.db=springboot
#management.metrics.export.influx.uri=http://localhost:8086
@ -76,22 +14,11 @@ management.metrics.export.influx.enabled=false
#management.metrics.export.influx.consistency=one
#management.metrics.export.influx.compressed=true
#*************** Access Log Related Configurations ***************#
### If turn on the access log:
server.tomcat.accesslog.enabled=true
### The access log pattern:
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D %{User-Agent}i
### The directory of access log:
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
server.tomcat.basedir=
#*************** Access Control Related Configurations ***************#
### If enable spring security, this option is deprecated in 1.2.0:
#spring.security.enabled=false
### The ignore urls of auth, is deprecated in 1.2.0:
nacos.security.ignore.urls=/,/error,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/v1/auth/**,/v1/console/health/**,/actuator/**,/v1/console/server/**
@ -101,60 +28,12 @@ nacos.core.auth.system.type=nacos
### If turn on auth system:
nacos.core.auth.enabled=false
nacos.core.auth.caching.enabled=false
### The token expiration in seconds:
nacos.core.auth.default.token.expire.seconds=18000
### The default token:
nacos.core.auth.default.token.secret.key=SecretKey012345678901234567890123456789012345678901234567890123456789
### Turn on/off caching of auth information. By turning on this switch, the update of auth information would have a 15 seconds delay.
nacos.core.auth.caching.enabled=true
#*************** Istio Related Configurations ***************#
### If turn on the MCP server:
nacos.istio.mcp.server.enabled=false
###*************** Add from 1.3.0-BETA ***************###
#*************** Core Related Configurations ***************#
### set the WorkerID manually
# nacos.core.snowflake.worker-id=
### Member-MetaData
# nacos.core.member.meta.site=
# nacos.core.member.meta.adweight=
# nacos.core.member.meta.weight=
### MemberLookup
### Addressing pattern category, If set, the priority is highest
# nacos.core.member.lookup.type=[file,address-server,discovery]
## Set the cluster list with a configuration file or command-line argument
# nacos.member.list=192.168.16.101:8847?raft_port=8807,192.168.16.101?raft_port=8808,192.168.16.101:8849?raft_port=8809
## for DiscoveryMemberLookup
# If you want to use cluster node self-discovery, turn this parameter on
# nacos.member.discovery=false
## for AddressServerMemberLookup
# Maximum number of retries to query the address server upon initialization
# nacos.core.address-server.retry=5
#*************** JRaft Related Configurations ***************#
### Sets the Raft cluster election timeout, default value is 5 second
# nacos.core.protocol.raft.data.election_timeout_ms=5000
### Sets the amount of time the Raft snapshot will execute periodically, default is 30 minute
# nacos.core.protocol.raft.data.snapshot_interval_secs=30
### Requested retries, default value is 1
# nacos.core.protocol.raft.data.request_failoverRetries=1
### raft internal worker threads
# nacos.core.protocol.raft.data.core_thread_num=8
### Number of threads required for raft business request processing
# nacos.core.protocol.raft.data.cli_service_thread_num=4
### raft linear read strategy, defaults to index
# nacos.core.protocol.raft.data.read_index_type=ReadOnlySafe
### rpc request timeout, default 5 seconds
# nacos.core.protocol.raft.data.rpc_request_timeout_ms=5000
tldSkipPatterns=derbyLocale_*.jar,jaxb-api.jar,jsr173_1.0_api.jar,jaxb1-impl.jar,activation.jar