fix: fix some bugs

This commit is contained in:
chuntaojun 2020-03-26 20:32:52 +08:00
parent 16ba537a37
commit ce6dc51bcf
26 changed files with 13228 additions and 120 deletions

View File

@ -1,5 +1,6 @@
package com.alibaba.nacos.client;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
@ -38,7 +39,7 @@ public class BeatReactorTest {
beatInfo.setScheduled(false);
beatInfo.setPeriod(1000L);
Mockito.doReturn(0L).when(namingProxy).sendBeat(beatInfo, true);
Mockito.doReturn(new JSONObject()).when(namingProxy).sendBeat(beatInfo, true);
beatReactor.addBeatInfo("testService", beatInfo);
Assert.assertEquals(1, getActiveThread(beatReactor));

View File

@ -50,6 +50,7 @@ public abstract class BaseHttpClient {
try {
final String body = EntityUtils.toString(response.getEntity());
HttpRestResult<T> resResult = new HttpRestResult<T>();
resResult.setCode(response.getStatusLine().getStatusCode());
resResult.setHttpCode(response.getStatusLine().getStatusCode());
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
RestResult<T> data = ResponseHandler.convert(body, reference);
@ -82,6 +83,7 @@ public abstract class BaseHttpClient {
try {
final String body = EntityUtils.toString(response.getEntity());
HttpRestResult<T> resResult = new HttpRestResult<T>();
resResult.setCode(response.getStatusLine().getStatusCode());
resResult.setHttpCode(response.getStatusLine().getStatusCode());
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
RestResult<T> data = ResponseHandler.convert(body, reference);

View File

@ -48,7 +48,6 @@ import com.alibaba.nacos.core.notify.NotifyCenter;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.dao.DataIntegrityViolationException;
@ -90,7 +89,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP implements B
dataSourceService.cleanAndReopenDerby();
jdbcTemplate = dataSourceService.getJdbcTemplate();
transactionTemplate = dataSourceService.getTransactionTemplate();
selfIp = memberManager.self().getAddress();
selfIp = memberManager.getSelf().getAddress();
NotifyCenter.registerPublisher(RaftDBErrorEvent::new, RaftDBErrorEvent.class);
defaultLog.info("use DistributedTransactionServicesImpl");
}

View File

@ -26,24 +26,24 @@
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.xolstice.maven.plugins</groupId>-->
<!-- <artifactId>protobuf-maven-plugin</artifactId>-->
<!-- <version>0.5.0</version>-->
<!-- <configuration>-->
<!-- <protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>-->
<!-- <pluginId>grpc-java</pluginId>-->
<!-- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <goals>-->
<!-- <goal>compile</goal>-->
<!-- <goal>compile-custom</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>

View File

@ -126,16 +126,15 @@ public class Member {
Member that = (Member) o;
if (StringUtils.isAnyBlank(address, that.address)) {
return port == that.port &&
Objects.equals(ip, that.ip);
StringUtils.equals(ip, that.ip);
}
return StringUtils.equals(address, that.address);
}
@Override
public String toString() {
return "Member{" +
"address='" + address + '\'' +
'}';
return "Member{" + "ip='" + ip + '\'' + ", port=" + port + ", state=" + state
+ '}';
}
@Override
@ -174,7 +173,9 @@ public class Member {
public Member build() {
Member serverNode = new Member();
serverNode.extendInfo.putAll(this.extendInfo);
if (Objects.nonNull(this.extendInfo)) {
serverNode.extendInfo.putAll(this.extendInfo);
}
serverNode.state = this.state;
serverNode.ip = this.ip;
serverNode.port = this.port;

View File

@ -59,7 +59,7 @@ public interface MemberManager {
*
* @return {@link Member}
*/
Member self();
Member getSelf();
/**
* this node ip is the first in node collection

View File

@ -36,4 +36,9 @@ public enum NodeState {
*/
DOWN,
/**
* The node is isolated
*/
ISOLATION,
}

View File

@ -29,7 +29,9 @@ import com.alibaba.nacos.core.cluster.task.MemberDeadBroadcastTask;
import com.alibaba.nacos.core.cluster.task.MemberPingTask;
import com.alibaba.nacos.core.cluster.task.MemberPullTask;
import com.alibaba.nacos.core.cluster.task.MemberShutdownTask;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.ConcurrentHashSet;
import com.alibaba.nacos.core.utils.Constants;
import com.alibaba.nacos.core.utils.GlobalExecutor;
@ -40,6 +42,7 @@ import com.alibaba.nacos.core.utils.ApplicationUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -52,6 +55,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContext;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
@ -100,6 +104,7 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
public void init() {
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = ApplicationUtils.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIp() + ":" + port;
@ -108,6 +113,18 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
NotifyCenter.registerPublisher(NodeChangeEvent::new, NodeChangeEvent.class);
NotifyCenter.registerPublisher(ServerInitializedEvent::new, ServerInitializedEvent.class);
NotifyCenter.registerPublisher(IsolationEvent::new, IsolationEvent.class);
NotifyCenter.registerSubscribe(new Subscribe<IsolationEvent>() {
@Override
public void onEvent(IsolationEvent event) {
self.setState(NodeState.ISOLATION);
}
@Override
public Class<? extends Event> subscribeType() {
return IsolationEvent.class;
}
});
// init nacos core sys
@ -131,7 +148,6 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
if (!serverList.containsKey(address)) {
memberJoin(new ArrayList<>(Arrays.asList(newMember)));
return;
}
serverList.computeIfPresent(address, new BiFunction<String, Member, Member>() {
@ -172,7 +188,7 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
}
@Override
public Member self() {
public Member getSelf() {
if (Objects.isNull(self)) {
self = serverList.get(localAddress);
}
@ -181,11 +197,12 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
@Override
public Collection<Member> allMembers() {
return serverList.values();
// We need to do a copy to avoid affecting the real data
return new ArrayList<>(serverList.values());
}
@Override
public synchronized void memberJoin(Collection<Member> members) {
public void memberJoin(Collection<Member> members) {
for (Iterator<Member> iterator = members.iterator(); iterator.hasNext(); ) {
final Member newMember = iterator.next();
final String address = newMember.getAddress();
@ -200,8 +217,10 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
}
// Ensure that the node is created only once
serverList.computeIfAbsent(address, s -> newMember);
memberAddressInfos.add(address);
serverList.computeIfAbsent(address, s -> {
memberAddressInfos.add(address);
return newMember;
});
serverList.computeIfPresent(address, new BiFunction<String, Member, Member>() {
@Override
public Member apply(String s, Member member) {
@ -222,7 +241,6 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
.changeNodes(members)
.allNodes(allMembers())
.build());
}
@Override
@ -230,17 +248,12 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
for (Iterator<Member> iterator = members.iterator(); iterator.hasNext(); ) {
Member member = iterator.next();
final String address = member.getAddress();
if (Objects.equals(address, localAddress)) {
if (StringUtils.equals(address, localAddress)) {
iterator.remove();
continue;
}
memberAddressInfos.remove(address);
serverList.computeIfPresent(address, new BiFunction<String, Member, Member>() {
@Override
public Member apply(String s, Member member) {
return null;
}
});
serverList.remove(address);
}
if (members.isEmpty()) {
@ -355,14 +368,14 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
}
private void injectMembers4CP(Config config) {
final Member selfMember = self();
final Member selfMember = getSelf();
final String self = selfMember.getIp() + ":" + Integer.parseInt(String.valueOf(selfMember.getExtendVal(MemberMetaDataConstants.RAFT_PORT)));
Set<String> others = MemberUtils.toCPMembersInfo(allMembers());
config.setMembers(self, others);
}
private void injectMembers4AP(Config config) {
final String self = self().getAddress();
final String self = getSelf().getAddress();
Set<String> others = MemberUtils.toAPMembersInfo(allMembers());
config.setMembers(self, others);
}
@ -400,8 +413,8 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
pullTask.init();
broadcastTask.init();
GlobalExecutor.schedulePingJob(pingTask, 10_000L);
GlobalExecutor.schedulePullJob(pullTask, 10_000L);
GlobalExecutor.schedulePingJob(pingTask, 5_000L);
GlobalExecutor.schedulePullJob(pullTask, 5_000L);
GlobalExecutor.scheduleBroadCastJob(broadcastTask, 10_000L);
NotifyCenter.publishEvent(new ServerInitializedEvent((WebServerInitializedEvent) event, servletContext));
@ -442,6 +455,11 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
}
}
@VisibleForTesting
public void updateMember(Member member) {
serverList.put(member.getAddress(), member);
}
public Set<String> getMemberAddressInfos() {
return memberAddressInfos;
}
@ -451,7 +469,7 @@ public class ServerMemberManager implements SmartApplicationListener, Disposable
}
public Map<String, Member> getServerList() {
return serverList;
return Collections.unmodifiableMap(serverList);
}
public boolean isUseAddressServer() {

View File

@ -56,10 +56,11 @@ public class ClusterConfSyncTask extends Task {
private int maxFailCount = 12;
private volatile boolean alreadyLoadServer = false;
private final String url = InetUtils.getSelfIp() + ":" + memberManager.getPort() + "?" + ApplicationUtils
.getProperty("nacos.standalone.params", "");
private Runnable standaloneJob = () -> MemberUtils.readServerConf(Collections.singletonList(url), memberManager);
private Runnable standaloneJob = () -> {
String url = InetUtils.getSelfIp() + ":" + memberManager.getPort() + "?" + ApplicationUtils
.getProperty("nacos.standalone.params", "");
MemberUtils.readServerConf(Collections.singletonList(url), memberManager);
};
public ClusterConfSyncTask(final ServerMemberManager memberManager, final ServletContext context) {
super(memberManager);

View File

@ -40,8 +40,6 @@ import java.util.List;
*/
public class MemberDeadBroadcastTask extends Task {
private static final int MAX_FAIL_CNT = 6;
private final TypeReference<RestResult<String>> reference
= new TypeReference<RestResult<String>>() {
};
@ -53,19 +51,14 @@ public class MemberDeadBroadcastTask extends Task {
@Override
protected void executeBody() {
Collection<Member> members = memberManager.allMembers();
Collection<Member> waitRemove = new ArrayList<>();
members.forEach(member -> {
if (member.getState() == NodeState.DOWN) {
if (NodeState.DOWN.equals(member.getState())) {
waitRemove.add(member);
}
});
List<Member> waitBroad = MemberUtils.kRandom(memberManager, member -> {
NodeState state = member.getState();
return state != NodeState.DOWN;
});
List<Member> waitBroad = MemberUtils.kRandom(memberManager, member -> !NodeState.DOWN.equals(member.getState()));
for (Member member : waitBroad) {
final String url = HttpUtils.buildUrl(false, member.getAddress(),
@ -97,6 +90,6 @@ public class MemberDeadBroadcastTask extends Task {
@Override
protected void after() {
GlobalExecutor.scheduleBroadCastJob(this, 10_000L);
GlobalExecutor.scheduleBroadCastJob(this, 5_000L);
}
}

View File

@ -66,7 +66,7 @@ public class MemberPingTask extends Task {
TimerContext.start("MemberPingTask");
try {
final Member self = memberManager.self();
final Member self = memberManager.getSelf();
// self node information is not ready
if (!self.check()) {
return;

View File

@ -56,7 +56,7 @@ public class MemberShutdownTask extends Task {
@Override
public void executeBody() {
Collection<Member> body = Collections.singletonList(memberManager.self());
Collection<Member> body = Collections.singletonList(memberManager.getSelf());
Loggers.CLUSTER.info("Start broadcasting this node logout");

View File

@ -23,6 +23,7 @@ import com.alibaba.nacos.core.auth.Secured;
import com.alibaba.nacos.core.cluster.IsolationEvent;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.id.IdGeneratorManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
@ -63,7 +64,7 @@ public class NacosClusterController {
@GetMapping(value = "/self")
public RestResult<Member> self() {
return RestResultUtils.success(memberManager.self());
return RestResultUtils.success(memberManager.getSelf());
}
@GetMapping(value = "/nodes")
@ -96,32 +97,32 @@ public class NacosClusterController {
@GetMapping("/server/health")
public RestResult<String> getHealth() {
return RestResultUtils.success("");
return RestResultUtils.success(memberManager.getSelf().getState().name());
}
@PostMapping("/server/report")
@PostMapping(value = {"/server/report", "/server/join"})
public RestResult<String> report(
@RequestBody(required = false) Member node,
@RequestParam(value = "sync") boolean sync) {
@RequestParam(value = "sync", required = false) boolean sync) {
if (!node.check()) {
return RestResultUtils.failedWithData("Node information is illegal");
}
Loggers.CLUSTER.debug("node state report, receive info : {}", node);
memberManager.update(node);
String data = "";
if (sync) {
data = JSON.toJSONString(MemberUtils.simpleMembers(memberManager));
}
Loggers.CLUSTER.debug("node state report, receive info : {}", node);
memberManager.update(node);
return RestResultUtils.success(data);
}
@PostMapping("/server/leave")
public RestResult<Boolean> memberLeave(@RequestBody Collection<Member> params) {
public RestResult<Boolean> leave(@RequestBody Collection<Member> params) {
memberManager.memberLeave(params);
return RestResultUtils.success();
}

View File

@ -28,8 +28,6 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.function.Supplier;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
@ -103,7 +101,7 @@ public class DistroMapper implements Mapper, MemberChangeListener {
}
}
final Member self = memberManager.self();
final Member self = memberManager.getSelf();
if (!ApplicationUtils.getProperty("nacos.core.distro.enable", Boolean.class, true)
|| ApplicationUtils.getStandaloneMode()) {
@ -128,7 +126,7 @@ public class DistroMapper implements Mapper, MemberChangeListener {
@Override
public String mapSrv(String key) {
final Member self = memberManager.self();
final Member self = memberManager.getSelf();
if (CollectionUtils.isEmpty(healthyList) ||
!ApplicationUtils

View File

@ -50,9 +50,7 @@ public class DefaultIdGenerator implements IdGenerator {
@Override
public void init() {
idStore = ApplicationUtils.getBean(DefaultIdStore.class);
// The first request requires an asynchronous request
idStore.firstAcquire(resource, Integer.MAX_VALUE, this, bufferIndex);
}

View File

@ -66,6 +66,7 @@ import org.springframework.stereotype.Component;
*
* @author <a href="mailto:liaochunyhm@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
@ConditionalOnProperty(value = "nacos.core.id-generator.type", havingValue = "default")
@Component
@DependsOn("serverMemberManager")
@ -77,7 +78,7 @@ public class DefaultIdStore extends LogProcessor4CP {
private final String FILE_PATH = Paths.get(ApplicationUtils.getNacosHome(), "data", "id_generator").toString();
private long ACQUIRE_STEP;
private CPProtocol cpProtocol;
private Map<String, IdStoreFile> storeFileMap;
private Map<String, IdStoreFile> storeFileMap = new ConcurrentHashMap<>(4);
private Serializer serializer;
private List<SnapshotOperation> snapshotOperations = Collections.singletonList(new IdSnapshotOperation());
@ -86,7 +87,6 @@ public class DefaultIdStore extends LogProcessor4CP {
@PostConstruct
protected void init() throws Exception {
Loggers.ID_GENERATOR.info("The Leaf-ID start");
this.storeFileMap = new ConcurrentHashMap<>(4);
this.serializer = SerializeFactory.getDefault();
ACQUIRE_STEP =
ConvertUtils.toLong(ApplicationUtils.getProperty("nacos.core.id-generator.default.acquire.step"), 100);
@ -106,12 +106,11 @@ public class DefaultIdStore extends LogProcessor4CP {
public void firstAcquire(String resource, int maxRetryCnt, DefaultIdGenerator generator, boolean bufferIndex) {
this.cpProtocol.protocolMetaData()
.subscribe(group(), Constants.LEADER_META_DATA, new Observer() {
@Override
public void update(Observable o, Object arg) {
GlobalExecutor.executeByCommon(() -> acquireNewIdSequence(resource, maxRetryCnt, generator, bufferIndex));
}
});
.subscribe(group(), Constants.LEADER_META_DATA, (o, arg) -> {
GlobalExecutor.executeByCommon(
() -> acquireNewIdSequence(resource, maxRetryCnt,
generator, bufferIndex));
});
}
public void acquireNewIdSequence(String resource, int maxRetryCnt, DefaultIdGenerator generator, boolean bufferIndex) {
@ -131,6 +130,7 @@ public class DefaultIdStore extends LogProcessor4CP {
// need read maxId from raft-leader
try {
long currentMaxId = Long.parseLong(protocol.getData(GetRequest.newBuilder()
.setGroup(group())
.setData(ByteString.copyFromUtf8(resource))
.build()).getData().toStringUtf8());
@ -177,10 +177,11 @@ public class DefaultIdStore extends LogProcessor4CP {
@Override
public LogFuture onApply(Log log) {
final AcquireId acquireId = serializer.deserialize(log.getData().toByteArray(), AcquireId.class);
final String resources = acquireId.getApplicant();
final String resource = acquireId.getApplicant();
final long minId = acquireId.getMinId();
final long maxId = acquireId.getMaxId();
IdStoreFile storeFile = storeFileMap.get(resources);
storeFileMap.computeIfAbsent(resource, s -> new IdStoreFile(resource));
IdStoreFile storeFile = storeFileMap.get(resource);
if (storeFile == null) {
return LogFuture.fail(new NoSuchElementException("The resource does not exist"));
}

View File

@ -83,7 +83,7 @@ public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, LogPr
loadLogProcessor(config.listLogProcessor());
this.selfAddress = memberManager.self().getAddress();
this.selfAddress = memberManager.getSelf().getAddress();
NotifyCenter.registerPublisher(RaftEvent::new, RaftEvent.class);
NotifyCenter.registerPublisher(RaftErrorEvent::new, RaftErrorEvent.class);
@ -194,7 +194,7 @@ public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, LogPr
}
private void injectProtocolMetaData(ProtocolMetaData metaData) {
Member member = memberManager.self();
Member member = memberManager.getSelf();
member.setExtendVal("raft_meta_data", metaData);
}

View File

@ -403,7 +403,6 @@ public class JRaftServer {
Loggers.RAFT.warn("No RaftGroup information currently exists");
return;
}
String[] s = address.split(":");
final String ip = s[0].trim();
int port = Integer.parseInt(s[1].trim());
@ -412,7 +411,12 @@ public class JRaftServer {
final String groupId = entry.getKey();
final Configuration conf = RouteTable.getInstance().getConfiguration(groupId);
final PeerId peerId = new PeerId(ip, port);
peerIdChange(groupId, peerId, () -> cliService.removePeer(groupId, conf, peerId));
peerIdChange(groupId, peerId, () -> {
if (cliService == null) {
return new Status(RaftError.UNKNOWN, "cliService is null");
}
return cliService.removePeer(groupId, conf, peerId);
});
}
}
@ -420,21 +424,26 @@ public class JRaftServer {
final int retryCnt = failoverRetries > 1 ? failoverRetries : 3;
RaftExecutor.executeByRaftCore(() -> {
for (int i = 0; i < retryCnt; i++) {
if (isShutdown) {
return;
}
if (!conf.contains(peerId)) {
return;
}
Status status = callable.get();
if (status.isOk()) {
refreshRouteTable(groupId);
return;
}
else {
Loggers.RAFT
.error("Node remove failed, groupId : {}, peerId : {}, status : {}, Try again the {} time",
groupId, peerId, status, i + 1);
ThreadUtils.sleep(500L);
try {
Status status = callable.get();
if (status.isOk()) {
refreshRouteTable(groupId);
return;
}
else {
Loggers.RAFT.error("Node remove failed, groupId : {}, peerId : {}, status : {}, Try again the {} time",
groupId, peerId, status, i + 1);
ThreadUtils.sleep(500L);
}
} catch (Exception e) {
Loggers.RAFT.error("An exception occurred during the node change operation : {}", e);
}
}
});

View File

@ -124,6 +124,7 @@ public class StartingSpringApplicationRunListener implements SpringApplicationRu
LOGGER.error("Nacos failed to start, please see {}logs/nacos.log for more details.",
ApplicationUtils.getNacosHome());
context.close();
}
/**

View File

@ -19,6 +19,7 @@ package com.alibaba.nacos.core.notify;
import com.alibaba.nacos.common.utils.ShutdownUtils;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.DisruptorFactory;
import com.alibaba.nacos.core.utils.Loggers;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
@ -47,12 +48,16 @@ public class NotifyCenter {
ShutdownUtils.addShutdownHook(new Thread(() -> {
System.out.println("[NotifyCenter] Start destroying Publisher");
PUBLISHER_MAP.forEach(new BiConsumer<String, Publisher>() {
@Override
public void accept(String s, Publisher publisher) {
publisher.shutdown();
}
});
try {
PUBLISHER_MAP.forEach(new BiConsumer<String, Publisher>() {
@Override
public void accept(String s, Publisher publisher) {
publisher.shutdown();
}
});
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("[NotifyCenter] Destruction of the end");
}));
@ -164,6 +169,7 @@ public class NotifyCenter {
private Disruptor<EventHandle> disruptor;
private Supplier<? extends Event> supplier;
private volatile boolean canOpen = false;
private volatile boolean shutdown = false;
public Publisher(final Class<? extends Event> eventType) {
this.eventType = eventType;
@ -193,7 +199,7 @@ public class NotifyCenter {
// waiting for the first Subscriber to register
for (; ; ) {
if (canOpen || stopDeferPublish) {
if (shutdown || canOpen || stopDeferPublish) {
break;
}
try {
@ -209,7 +215,11 @@ public class NotifyCenter {
if (Objects.nonNull(executor)) {
executor.execute(job);
} else {
job.run();
try {
job.run();
} catch (Exception e) {
Loggers.CORE.error("Event callback exception : {}", e);
}
}
}
}
@ -243,6 +253,7 @@ public class NotifyCenter {
void shutdown() {
if (disruptor != null) {
shutdown = true;
disruptor.shutdown();
}
}

View File

@ -312,7 +312,7 @@ public class ApplicationUtils implements ApplicationContextInitializer<Configura
*/
public static final String LOCAL_IP = InetUtils.getSelfIp();
private static Boolean isStandalone = false;
private static Boolean isStandalone = null;
private static String functionModeType = null;

View File

@ -21,7 +21,6 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;
@ -55,7 +54,7 @@ public class RaftListener implements SmartApplicationListener {
RaftPeer peers = raftEvent.getRaftPeer();
String json = JSON.toJSONString(peers);
Map map = JSON.parseObject(json, HashMap.class);
Member self = memberManager.self();
Member self = memberManager.getSelf();
self.setExtendVal(GROUP, map);
memberManager.update(self);
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,452 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.config;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.config.server.service.PersistService;
import com.alibaba.nacos.config.server.service.transaction.DistributedDatabaseOperateImpl;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.Constants;
import com.alibaba.nacos.core.distributed.id.IdGeneratorManager;
import com.alibaba.nacos.core.distributed.raft.utils.JRaftConstants;
import com.alibaba.nacos.core.utils.DiskUtils;
import com.alibaba.nacos.core.utils.InetUtils;
import com.alibaba.nacos.core.utils.ThreadUtils;
import com.alibaba.nacos.test.base.HttpClient4Test;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.web.context.support.StandardServletEnvironment;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class ConfigDerbyRaft_ITCase
extends HttpClient4Test {
private static final String CONFIG_INFO_ID = "config-info-id";
private static final String CONFIG_HISTORY_ID = "config-history-id";
private static final String CONFIG_TAG_RELATION_ID = "config-tag-relation-id";
private static final String CONFIG_BETA_ID = "config-beta-id";
private static final String NAMESPACE_ID = "namespace-id";
private static final String USER_ID = "user-id";
private static final String ROLE_ID = "role-id";
private static final String PERMISSION_ID = "permissions_id";
private static String serverIp7 = "127.0.0.1:8847";
private static String serverIp8 = "127.0.0.1:8848";
private static String serverIp9 = "127.0.0.1:8849";
private static ConfigService iconfig7;
private static ConfigService iconfig8;
private static ConfigService iconfig9;
private static final AtomicBoolean[] finished = new AtomicBoolean[]{new AtomicBoolean(false), new AtomicBoolean(false), new AtomicBoolean(false)};
private static Map<String, ConfigurableApplicationContext> applications = new HashMap<>();
private static String clusterInfo;
static {
System.getProperties().setProperty("nacos.standalone", "false");
String ip = InetUtils.getSelfIp();
clusterInfo = "nacos.cluster=" + ip + ":8847?raft_port=8807," + ip
+ ":8848?raft_port=8808," + ip + ":8849?raft_port=8809";
}
@BeforeClass
public static void before() throws Exception {
System.getProperties().setProperty("nacos.core.auth.enabled", "false");
CountDownLatch latch = new CountDownLatch(3);
Runnable runnable = () -> {
for (int i = 0; i < 3; i++) {
try {
URL runnerUrl = new File("../console/target/classes").toURI().toURL();
URL[] urls = new URL[] { runnerUrl };
URLClassLoader cl = new URLClassLoader(urls);
Class<?> runnerClass = cl.loadClass("com.alibaba.nacos.Nacos");
run(i, latch, runnerClass);
} catch (Exception e) {
latch.countDown();
}
}
};
new Thread(runnable).start();
latch.await();
System.out.println("The cluster node initialization is complete");
Properties setting7 = new Properties();
setting7.put(PropertyKeyConst.SERVER_ADDR, serverIp7);
setting7.put(PropertyKeyConst.USERNAME, "nacos");
setting7.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig7 = NacosFactory.createConfigService(setting7);
Properties setting8 = new Properties();
setting8.put(PropertyKeyConst.SERVER_ADDR, serverIp8);
setting8.put(PropertyKeyConst.USERNAME, "nacos");
setting8.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig8 = NacosFactory.createConfigService(setting8);
Properties setting9 = new Properties();
setting9.put(PropertyKeyConst.SERVER_ADDR, serverIp9);
setting9.put(PropertyKeyConst.USERNAME, "nacos");
setting9.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig9 = NacosFactory.createConfigService(setting9);
TimeUnit.SECONDS.sleep(20L);
}
@AfterClass
public static void after() throws Exception {
CountDownLatch latch = new CountDownLatch(applications.size());
for (ConfigurableApplicationContext context : applications.values()) {
new Thread(() -> {
try {
System.out.println("start close : " + context);
context.close();
} catch (Exception ignore) {
} finally {
System.out.println("finished close : " + context);
latch.countDown();
}
}).start();
}
latch.await();
}
@Test
public void test_a_publish_config() throws Exception {
boolean result = iconfig7.publishConfig("raft_test", "cluster_test_1",
"this.is.raft_cluster=lessspring_7");
Assert.assertTrue(result);
ThreadUtils.sleep(5000);
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean("persistService", PersistService.class);
PersistService operate8 = context8.getBean("persistService", PersistService.class);
PersistService operate9 = context9.getBean("persistService", PersistService.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s9 = operate9.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
Assert.assertArrayEquals("The three nodes must have consistent data",
new String[] { s7, s8, s9 },
new String[] { "this.is.raft_cluster=lessspring_7",
"this.is.raft_cluster=lessspring_7",
"this.is.raft_cluster=lessspring_7" });
}
@Test
public void test_b_publish_config() throws Exception {
boolean result = iconfig8.publishConfig("raft_test", "cluster_test_2",
"this.is.raft_cluster=lessspring_8");
Assert.assertTrue(result);
ThreadUtils.sleep(5000);
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean("persistService", PersistService.class);
PersistService operate8 = context8.getBean("persistService", PersistService.class);
PersistService operate9 = context9.getBean("persistService", PersistService.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s9 = operate9.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
Assert.assertArrayEquals("The three nodes must have consistent data",
new String[] { s7, s8, s9 },
new String[] { "this.is.raft_cluster=lessspring_8",
"this.is.raft_cluster=lessspring_8",
"this.is.raft_cluster=lessspring_8" });
}
@Test
public void test_c_publish_config() throws Exception {
boolean result = iconfig9.publishConfig("raft_test", "cluster_test_2",
"this.is.raft_cluster=lessspring_8");
Assert.assertTrue(result);
ThreadUtils.sleep(5000);
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean("persistService", PersistService.class);
PersistService operate8 = context8.getBean("persistService", PersistService.class);
PersistService operate9 = context9.getBean("persistService", PersistService.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
String s9 = operate9.findConfigInfo("raft_test", "cluster_test_2", "").getContent();
Assert.assertArrayEquals("The three nodes must have consistent data",
new String[] { s7, s8, s9 },
new String[] { "this.is.raft_cluster=lessspring_8",
"this.is.raft_cluster=lessspring_8",
"this.is.raft_cluster=lessspring_8" });
}
@Test
public void test_d_modify_config() throws Exception {
boolean result = iconfig7.publishConfig("raft_test", "cluster_test_1",
"this.is.raft_cluster=lessspring_7_it_is_for_modify");
Assert.assertTrue(result);
ThreadUtils.sleep(5000);
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
PersistService operate7 = context7.getBean("persistService", PersistService.class);
PersistService operate8 = context8.getBean("persistService", PersistService.class);
PersistService operate9 = context9.getBean("persistService", PersistService.class);
String s7 = operate7.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s8 = operate8.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
String s9 = operate9.findConfigInfo("raft_test", "cluster_test_1", "").getContent();
Assert.assertArrayEquals("The three nodes must have consistent data",
new String[] { s7, s8, s9 },
new String[] { "this.is.raft_cluster=lessspring_7_it_is_for_modify",
"this.is.raft_cluster=lessspring_7_it_is_for_modify",
"this.is.raft_cluster=lessspring_7_it_is_for_modify" });
}
@Test
public void test_e_id_generator() throws Exception {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
IdGeneratorManager manager7 = context7.getBean(IdGeneratorManager.class);
IdGeneratorManager manager8 = context8.getBean(IdGeneratorManager.class);
IdGeneratorManager manager9 = context9.getBean(IdGeneratorManager.class);
CPProtocol protocol7 = context7.getBean(CPProtocol.class);
CPProtocol protocol8 = context8.getBean(CPProtocol.class);
CPProtocol protocol9 = context9.getBean(CPProtocol.class);
final String configGroup = com.alibaba.nacos.config.server.constant.Constants.CONFIG_MODEL_RAFT_GROUP;
long configInfo7 = manager7.nextId(CONFIG_INFO_ID);
long configInfo8 = manager8.nextId(CONFIG_INFO_ID);
long configInfo9 = manager9.nextId(CONFIG_INFO_ID);
if (protocol7.isLeader(configGroup)) {
Assert.assertNotEquals(-1, configInfo7);
Assert.assertEquals(-1, configInfo8);
Assert.assertEquals(-1, configInfo9);
return;
}
if (protocol8.isLeader(configGroup)) {
Assert.assertEquals(-1, configInfo7);
Assert.assertNotEquals(-1, configInfo8);
Assert.assertEquals(-1, configInfo9);
return;
}
if (protocol9.isLeader(configGroup)) {
Assert.assertEquals(-1, configInfo7);
Assert.assertEquals(-1, configInfo8);
Assert.assertNotEquals(-1, configInfo9);
}
}
@Test
public void test_f_id_generator_leader_transfer() throws Exception {
ConfigurableApplicationContext context7 = applications.get("8847");
ConfigurableApplicationContext context8 = applications.get("8848");
ConfigurableApplicationContext context9 = applications.get("8849");
IdGeneratorManager manager7 = context7.getBean(IdGeneratorManager.class);
IdGeneratorManager manager8 = context8.getBean(IdGeneratorManager.class);
IdGeneratorManager manager9 = context9.getBean(IdGeneratorManager.class);
CPProtocol protocol7 = context7.getBean(CPProtocol.class);
CPProtocol protocol8 = context8.getBean(CPProtocol.class);
CPProtocol protocol9 = context9.getBean(CPProtocol.class);
final String configGroup = com.alibaba.nacos.config.server.constant.Constants.CONFIG_MODEL_RAFT_GROUP;
long preId = -1L;
long currentId = -1L;
if (protocol7.isLeader(configGroup)) {
preId = manager7.nextId(CONFIG_INFO_ID);
}
if (protocol8.isLeader(configGroup)) {
preId = manager8.nextId(CONFIG_INFO_ID);
}
if (protocol9.isLeader(configGroup)) {
preId = manager9.nextId(CONFIG_INFO_ID);
}
// transfer leader to ip:8807
Map<String, String> transfer = new HashMap<>();
transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":8807");
RestResult<String> result = protocol7.execute(transfer);
Assert.assertTrue(result.ok());
System.out.println(result);
TimeUnit.SECONDS.sleep(2);
Assert.assertTrue(protocol7.isLeader(configGroup));
currentId = manager7.nextId(CONFIG_INFO_ID);
Assert.assertNotEquals(preId, currentId);
preId = currentId;
// transfer leader to ip:8808
transfer = new HashMap<>();
transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":8808");
result = protocol8.execute(transfer);
Assert.assertTrue(result.ok());
System.out.println(result);
TimeUnit.SECONDS.sleep(2);
Assert.assertTrue(protocol8.isLeader(configGroup));
currentId = manager8.nextId(CONFIG_INFO_ID);
Assert.assertNotEquals(preId, currentId);
preId = currentId;
// transfer leader to ip:8809
transfer = new HashMap<>();
transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":8809");
result = protocol9.execute(transfer);
Assert.assertTrue(result.ok());
System.out.println(result);
TimeUnit.SECONDS.sleep(2);
Assert.assertTrue(protocol9.isLeader(configGroup));
currentId = manager9.nextId(CONFIG_INFO_ID);
Assert.assertNotEquals(preId, currentId);
}
private static void run(final int index, CountDownLatch latch, Class<?> cls) {
Runnable runnable = () -> {
try {
DiskUtils.deleteDirectory(Paths.get(System.getProperty("user.home"),
"/nacos-" + index + "/").toString());
final String path = Paths.get(System.getProperty("user.home"), "/nacos-" + index + "/").toString();
DiskUtils.deleteDirectory(path);
Map<String, Object> properties = new HashMap<>();
properties.put("server.port", "884" + (7 + index));
properties.put("nacos.home",
Paths.get(System.getProperty("user.home"), "/nacos-" + index + "/").toString());
properties.put("nacos.logs.path",
Paths.get(System.getProperty("user.home"), "/nacos-" + index + "/logs/").toString());
properties.put("spring.jmx.enabled", false);
MapPropertySource propertySource = new MapPropertySource(
"nacos_cluster_test", properties);
ConfigurableEnvironment environment = new StandardServletEnvironment();
environment.getPropertySources().addFirst(propertySource);
SpringApplication cluster = new SpringApplicationBuilder(cls).web(
WebApplicationType.SERVLET).environment(environment)
.properties(clusterInfo).properties("embeddedDistributedStorage=true").build();
ConfigurableApplicationContext context = cluster.run();
context.stop();
DistributedDatabaseOperateImpl operate = context.getBean(DistributedDatabaseOperateImpl.class);
CPProtocol protocol = context.getBean(CPProtocol.class);
protocol.protocolMetaData()
.subscribe(operate.group(), Constants.LEADER_META_DATA, new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println("node : 884" + (7 + index) + "-> select leader is : " + arg);
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
}
});
new Thread(() -> {
try {
Thread.sleep(5000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
}
});
applications.put(String.valueOf(properties.get("server.port")), context);
}
catch (Exception e) {
e.printStackTrace();
latch.countDown();
}
};
runnable.run();
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.core.cluster;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.Nacos;
import com.alibaba.nacos.common.http.HttpClientManager;
import com.alibaba.nacos.common.http.HttpUtils;
import com.alibaba.nacos.common.http.NSyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.Commons;
import com.alibaba.nacos.core.utils.InetUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Nacos.class, properties = {"server.servlet.context-path=/nacos", "server.port=7001"},
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class ServerMemberManager_ITCase {
@Autowired
private ServerMemberManager memberManager;
@LocalServerPort
private int port;
private static NSyncHttpClient httpClient;
@Before
public void init() throws Exception {
TimeUnit.SECONDS.sleep(5L);
if (httpClient == null) {
httpClient = HttpClientManager.newHttpClient(ServerMemberManager_ITCase.class.getCanonicalName());
}
}
@After
public void after() throws Exception {
String url = HttpUtils.buildUrl(false, "localhost:" + memberManager.getSelf().getPort(),
memberManager.getContextPath(),
Commons.NACOS_CORE_CONTEXT,
"/cluster/server/leave");
RestResult<String> result = httpClient.post(url, Header.EMPTY, Query.EMPTY,
Collections.singletonList(Member
.builder()
.ip("1.1.1.1")
.port(80)
.build()),
new TypeReference<RestResult<String>>(){});
System.out.println(result);
System.out.println(memberManager.getServerList());
Assert.assertTrue(result.ok());
}
@Test
public void test_a_member_join_no_sync() throws Exception {
Collection<Member> members = memberManager.allMembers();
Assert.assertEquals(members.size(), 1);
Assert.assertEquals(InetUtils.getSelfIp() + ":" + port, memberManager.getSelf().getAddress());
RestResult<String> result = memberJoin();
Assert.assertTrue(result.ok());
members = memberManager.allMembers();
Assert.assertEquals(2, members.size());
members.removeIf(member -> memberManager.isSelf(member));
Assert.assertEquals(members.size(), 1);
Member newJoin = members.iterator().next();
Assert.assertEquals(newJoin.getIp(), "1.1.1.1");
Assert.assertEquals(newJoin.getPort(), 80);
}
@Test
public void test_b_member_join_with_sync() throws Exception {
Collection<Member> members = memberManager.allMembers();
Assert.assertEquals(1, members.size());
Assert.assertEquals(InetUtils.getSelfIp() + ":" + port, memberManager.getSelf().getAddress());
RestResult<String> result = memberJoin();
Assert.assertTrue(result.ok());
Collection<String> remoteServer = JSON.parseObject(result.getData(), new TypeReference<List<String>>(){});
Assert.assertTrue(StringUtils.startsWith(remoteServer.iterator().next(), memberManager.getSelf().getAddress()));
members = memberManager.allMembers();
Assert.assertEquals(2, members.size());
}
@Test
public void test_z_member_isolation() throws Exception {
String url = HttpUtils.buildUrl(false, "localhost:" + memberManager.getSelf().getPort(),
memberManager.getContextPath(),
Commons.NACOS_CORE_CONTEXT,
"/cluster/isolation");
RestResult<String> result = httpClient.post(url, Header.EMPTY, Query.EMPTY, null,
new TypeReference<RestResult<String>>(){});
Assert.assertTrue(result.ok());
url = HttpUtils.buildUrl(false, "localhost:" + memberManager.getSelf().getPort(),
memberManager.getContextPath(),
Commons.NACOS_CORE_CONTEXT,
"/cluster/server/health");
result = httpClient.get(url, Header.EMPTY, Query.EMPTY, new TypeReference<RestResult<String>>(){});
Assert.assertTrue(result.ok());
Assert.assertEquals(NodeState.ISOLATION.name(), result.getData());
}
private RestResult<String> memberJoin() throws Exception {
final String url = HttpUtils.buildUrl(false, "localhost:" + memberManager.getSelf().getPort(),
memberManager.getContextPath(),
Commons.NACOS_CORE_CONTEXT,
"/cluster/server/report");
return httpClient.post(url, Header.EMPTY, Query.newInstance().addParam("sync", true), Member
.builder()
.ip("1.1.1.1")
.port(80)
.build(),
new TypeReference<RestResult<String>>(){});
}
}

View File

@ -1,12 +1,94 @@
# spring
#*************** Spring Boot Related Configurations ***************#
### Default web context path:
server.servlet.contextPath=/nacos
### Default web server port:
server.port=8848
nacos.standalone=true
#*************** Network Related Configurations ***************#
### If prefer hostname over ip for Nacos server addresses in cluster.conf:
# nacos.inetutils.prefer-hostname-over-ip=false
### Specify local server's IP:
# nacos.inetutils.ip-address=
#*************** Core Related Configurations ***************#
### Whether to turn on inter-member discovery, If this configuration is enabled, the cluster.conf configuration
### of the newly added member requires that a member of the known cluster be added to build the discovery channel
nacos.core.member.self-discovery=true
### Which nacos embedded distributed ID is turned on,
### If an external implementation is provided, the external implementation is automatically selected
nacos.core.id-generator.type=default
### The step size for each fetch of the embedded distributed ID
nacos.core.id-generator.default.acquire.step=100
### If nacos.core.idGenerator.type=snakeflower, You need to set the dataCenterID manually
# nacos.core.snowflake.data-center=
### If nacos.core.idGenerator.type=snakeflower, You need to set the WorkerID manually
# nacos.core.snowflake.worker-id=
#*************** Config Module Related Configurations ***************#
### If user MySQL as datasource:
# spring.datasource.platform=mysql
### Open circuit
# nacos.config.open-circuit=false
### Count of DB:
# db.num=1
### Connect URL of DB:
# db.url.0=jdbc:mysql://1.1.1.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
# db.user=user
# db.password=password
#*************** Naming Module Related Configurations ***************#
### Data dispatch task execution period in milliseconds:
# nacos.naming.distro.taskDispatchPeriod=200
### Data count of batch sync task:
# nacos.naming.distro.batchSyncKeyCount=1000
### Retry delay in milliseconds if sync task failed:
# nacos.naming.distro.syncRetryDelay=5000
### If enable data warmup. If set to false, the server would accept request without local data preparation:
# nacos.naming.data.warmup=true
### If enable the instance auto expiration, kind like of health check of instance:
# nacos.naming.expireInstance=true
nacos.naming.empty-service.auto-clean=true
nacos.naming.empty-service.clean.initial-delay-ms=50000
nacos.naming.empty-service.clean.period-time-ms=30000
#*************** CMDB Module Related Configurations ***************#
### The interval to dump external CMDB in seconds:
# nacos.cmdb.dumpTaskInterval=3600
### The interval of polling data change event in seconds:
# nacos.cmdb.eventTaskInterval=10
### The interval of loading labels in seconds:
# nacos.cmdb.labelTaskInterval=300
### If turn on data loading task:
# nacos.cmdb.loadDataAtStart=false
#*************** Metrics Related Configurations ***************#
### Metrics for prometheus
#management.endpoints.web.exposure.include=*
### Metrics for elastic search
management.metrics.export.elastic.enabled=false
#management.metrics.export.elastic.host=http://localhost:9200
# metrics for influx
### Metrics for influx
management.metrics.export.influx.enabled=false
#management.metrics.export.influx.db=springboot
#management.metrics.export.influx.uri=http://localhost:8086
@ -14,11 +96,22 @@ management.metrics.export.influx.enabled=false
#management.metrics.export.influx.consistency=one
#management.metrics.export.influx.compressed=true
#*************** Access Log Related Configurations ***************#
### If turn on the access log:
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
### The access log pattern:
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D %{User-Agent}i
### The directory of access log:
server.tomcat.basedir=
#*************** Access Control Related Configurations ***************#
### If enable spring security, this option is deprecated in 1.2.0:
#spring.security.enabled=false
### The ignore urls of auth, is deprecated in 1.2.0:
nacos.security.ignore.urls=/,/error,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/v1/auth/**,/v1/console/health/**,/actuator/**,/v1/console/server/**
@ -28,12 +121,79 @@ nacos.core.auth.system.type=nacos
### If turn on auth system:
nacos.core.auth.enabled=false
nacos.core.auth.caching.enabled=false
### The token expiration in seconds:
nacos.core.auth.default.token.expire.seconds=18000
### The default token:
nacos.core.auth.default.token.secret.key=SecretKey012345678901234567890123456789012345678901234567890123456789
tldSkipPatterns=derbyLocale_*.jar,jaxb-api.jar,jsr173_1.0_api.jar,jaxb1-impl.jar,activation.jar
### Turn on/off caching of auth information. By turning on this switch, the update of auth information would have a 15 seconds delay.
nacos.core.auth.caching.enabled=true
#*************** Istio Related Configurations ***************#
### If turn on the MCP server:
nacos.istio.mcp.server.enabled=false
#*************** Embed Storage Related Configurations ***************#
### Whether to open embedded distributed storage in nacos cluster mode
embeddedDistributedStorage=true
#*************** Consistency Related Configurations ***************#
# About Raft
### Sets the Raft cluster election timeout, default value is 5 second
nacos.core.protocol.raft.data.election_timeout_ms=5000
### Sets the amount of time the Raft snapshot will execute periodically, default is 30 minute
nacos.core.protocol.raft.data.snapshot_interval_secs=30
### Requested retries, default value is 1
nacos.core.protocol.raft.data.request_failoverRetries=1
### raft internal worker threads
nacos.core.protocol.raft.data.core_thread_num=8
### Number of threads required for raft business request processing
nacos.core.protocol.raft.data.cli_service_thread_num=4
### raft linear read strategy, defaults to index
nacos.core.protocol.raft.data.read_index_type=ReadOnlySafe
### rpc request timeout, default 5 seconds
nacos.core.protocol.raft.data.rpc_request_timeout_ms=5000
### Maximum size of each file RPC (snapshot copy) request between members, default is 128 K
nacos.core.protocol.raft.data.max_byte_count_per_rpc=131072
### Maximum number of logs sent from leader to follower, default is 1024
nacos.core.protocol.raft.data.max_entries_size=1024
### Maximum body size for sending logs from leader to follower, default is 512K
nacos.core.protocol.raft.data.max_body_size=524288
### Maximum log storage buffer size, default 256K
nacos.core.protocol.raft.data.max_append_buffer_size=262144
### Election timer interval will be a random maximum outside the specified time, default is 1 second
nacos.core.protocol.raft.data.max_election_delay_ms=1000
### Specify the ratio between election timeout and heartbeat interval. Heartbeat interval is equal to
### electionTimeoutMs/electionHeartbeatFactorOne tenth by default.
nacos.core.protocol.raft.data.election_heartbeat_factor=10
### The tasks submitted to the leader accumulate the maximum batch size of a batch flush log storage. The default is 32 tasks.
nacos.core.protocol.raft.data.apply_batch=32
### Call fsync when necessary when writing logs and meta information, usually should be true
nacos.core.protocol.raft.data.sync=true
### Whether to write snapshot / raft meta-information to call fsync. The default is false. When sync is true, it is preferred to respect sync.
nacos.core.protocol.raft.data.sync_meta=false
### Internal disruptor buffer size. For applications with high write throughput, you need to increase this value. The default value is 16384.
nacos.core.protocol.raft.data.disruptor_buffer_size=16384
### Whether to enable replication of pipeline request optimization, which is enabled by default
nacos.core.protocol.raft.data.replicator_pipeline=true
### Maximum number of in-flight requests with pipeline requests enabled, default is 256
nacos.core.protocol.raft.data.max_replicator_inflight_msgs=256
### Whether to enable LogEntry checksum
nacos.core.protocol.raft.data.enable_log_entry_checksum=false
# About Distro
### Maximum interval between two data transmissions
nacos.core.protocol.distro.data.task_dispatch_period_ms=2000
### Number of keys per batch of tasks
nacos.core.protocol.distro.data.batch_sync_key_count=1000
### Task retry delay time
nacos.core.protocol.distro.data.sync_retry_delay_ms=5000
### Whether to enable the authoritative server mechanism
nacos.core.protocol.distro.data.distro_enabled=true
### Data synchronization retry strategy
nacos.core.protocol.distro.data.retry_policy=simple