Merge branch 'develop' of https://github.com/alibaba/nacos into new_develop

This commit is contained in:
chuntaojun 2020-06-24 15:52:09 +08:00
commit 3d8b55cca0
12 changed files with 301 additions and 148 deletions

View File

@ -137,7 +137,8 @@ public class Member implements Comparable<Member>, Cloneable {
@Override
public String toString() {
return "Member{" + "address='" + getAddress() + '\'' + '}';
return "Member{" + "ip='" + ip + '\'' + ", port=" + port + ", state=" + state + ", extendInfo=" + extendInfo
+ '}';
}
@Override

View File

@ -24,16 +24,16 @@ import com.alibaba.nacos.core.notify.listener.Subscribe;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public interface MemberChangeListener extends Subscribe<MemberChangeEvent> {
public interface MemberChangeListener extends Subscribe<MembersChangeEvent> {
/**
* return NodeChangeEvent.class info.
*
* @return {@link MemberChangeEvent#getClass()}
* @return {@link MembersChangeEvent#getClass()}
*/
@Override
default Class<? extends Event> subscribeType() {
return MemberChangeEvent.class;
return MembersChangeEvent.class;
}
/**

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Member node tool class.
@ -208,6 +209,10 @@ public class MemberUtils {
return nodes;
}
public static Set<Member> selectTargetMembers(Collection<Member> members, Predicate<Member> filter) {
return members.stream().filter(filter).collect(Collectors.toSet());
}
public static List<String> simpleMembers(Collection<Member> members) {
return members.stream().map(Member::getAddress).sorted()
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

View File

@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class MemberChangeEvent implements Event {
public class MembersChangeEvent implements Event {
private static final AtomicLong SEQUENCE = new AtomicLong(0);
@ -59,6 +59,11 @@ public class MemberChangeEvent implements Event {
return no;
}
@Override
public String toString() {
return "MembersChangeEvent{" + "members=" + members + ", no=" + no + '}';
}
public static final class MemberChangeEventBuilder {
private Collection<Member> allMembers;
@ -74,12 +79,12 @@ public class MemberChangeEvent implements Event {
/**
* build MemberChangeEvent.
*
* @return {@link MemberChangeEvent}
* @return {@link MembersChangeEvent}
*/
public MemberChangeEvent build() {
MemberChangeEvent memberChangeEvent = new MemberChangeEvent();
memberChangeEvent.setMembers(allMembers);
return memberChangeEvent;
public MembersChangeEvent build() {
MembersChangeEvent membersChangeEvent = new MembersChangeEvent();
membersChangeEvent.setMembers(allMembers);
return membersChangeEvent;
}
}
}

View File

@ -56,7 +56,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BiFunction;
/**
* Cluster node management in Nacos.
@ -77,53 +76,52 @@ import java.util.function.BiFunction;
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Component(value = "serverMemberManager")
@SuppressWarnings("all")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
private final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient();
/**
* Cluster node list
* Cluster node list.
*/
private volatile ConcurrentSkipListMap<String, Member> serverList;
/**
* Is this node in the cluster list
* Is this node in the cluster list.
*/
private volatile boolean isInIpList = true;
/**
* port
* port.
*/
private int port;
/**
* Address information for the local node
* Address information for the local node.
*/
private String localAddress;
/**
* Addressing pattern instances
* Addressing pattern instances.
*/
private MemberLookup lookup;
/**
* self member obj
* self member obj.
*/
private volatile Member self;
/**
* here is always the node information of the "UP" state
* here is always the node information of the "UP" state.
*/
private volatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
/**
* Broadcast this node element information task
* Broadcast this node element information task.
*/
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public ServerMemberManager(ServletContext servletContext) throws Exception {
this.serverList = new ConcurrentSkipListMap();
this.serverList = new ConcurrentSkipListMap<>();
ApplicationUtils.setContextPath(servletContext.getContextPath());
MemberUtils.setManager(this);
@ -156,14 +154,14 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
this.lookup.start();
}
public void swithLookup(String name) throws NacosException {
public void switchLookup(String name) throws NacosException {
this.lookup = LookupFactory.switchLookup(name, this);
this.lookup.start();
}
private void registerClusterEvent() {
// Register node change events
NotifyCenter.registerToPublisher(MemberChangeEvent.class,
NotifyCenter.registerToPublisher(MembersChangeEvent.class,
ApplicationUtils.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// The address information of this node needs to be dynamically modified
@ -193,8 +191,14 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
});
}
/**
* member information update.
*
* @param newMember {@link Member}
* @return update is success
*/
public boolean update(Member newMember) {
Loggers.CLUSTER.debug("Node information update : {}", newMember);
Loggers.CLUSTER.debug("member information update : {}", newMember);
String address = newMember.getAddress();
newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
@ -202,19 +206,29 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
if (!serverList.containsKey(address)) {
return false;
}
serverList.computeIfPresent(address, new BiFunction<String, Member, Member>() {
@Override
public Member apply(String s, Member member) {
if (!NodeState.UP.equals(newMember.getState())) {
serverList.computeIfPresent(address, (s, member) -> {
if (NodeState.DOWN.equals(newMember.getState())) {
memberAddressInfos.remove(newMember.getAddress());
}
MemberUtils.copy(newMember, member);
return member;
}
});
// member data changes and all listeners need to be notified
NotifyCenter.publishEvent(MembersChangeEvent.builder()
.members(allMembers())
.build());
return true;
}
/**
* Whether the node exists within the cluster.
*
* @param address ip:port
* @return is exist
*/
public boolean hasMember(String address) {
boolean result = serverList.containsKey(address);
if (!result) {
@ -237,6 +251,11 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
return serverList.get(address);
}
/**
* return this cluster all members.
*
* @return {@link Collection} all member
*/
public Collection<Member> allMembers() {
// We need to do a copy to avoid affecting the real data
HashSet<Member> set = new HashSet<>(serverList.values());
@ -244,6 +263,11 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
return set;
}
/**
* return this cluster all members without self.
*
* @return {@link Collection} all member without self
*/
public List<Member> allMembersWithoutSelf() {
List<Member> members = new ArrayList<>(serverList.values());
members.remove(self);
@ -273,7 +297,7 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
// are involved and all recipients need to be notified of the node change event
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members) {
final String address = member.getAddress();
@ -287,8 +311,8 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
tmpAddressInfo.add(address);
}
Map oldList = serverList;
Set oldSet = memberAddressInfos;
Map<String, Member> oldList = serverList;
Set<String> oldSet = memberAddressInfos;
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
@ -305,25 +329,46 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
// that the event publication is sequential
if (hasChange) {
MemberUtils.syncToFile(finalMembers);
Event event = MemberChangeEvent.builder().members(finalMembers).build();
Set<Member> healthMembers = MemberUtils.selectTargetMembers(members, member -> {
return !NodeState.DOWN.equals(member.getState());
});
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
}
return hasChange;
}
/**
* members join this cluster.
*
* @param members {@link Collection} new members
* @return is success
*/
public synchronized boolean memberJoin(Collection<Member> members) {
Set<Member> set = new HashSet<>(members);
set.addAll(allMembers());
return memberChange(set);
}
/**
* members leave this cluster.
*
* @param members {@link Collection} wait leave members
* @return is success
*/
public synchronized boolean memberLeave(Collection<Member> members) {
Set<Member> set = new HashSet<>(allMembers());
set.removeAll(members);
return memberChange(set);
}
/**
* this member {@link Member#getState()} is health.
*
* @param address ip:port
* @return is health
*/
public boolean isUnHealth(String address) {
Member member = serverList.get(address);
if (member == null) {
@ -347,6 +392,11 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
Loggers.CLUSTER.info("This node is ready to provide external services");
}
/**
* ServerMemberManager shutdown.
*
* @throws NacosException NacosException
*/
@PreDestroy
public void shutdown() throws NacosException {
serverList.clear();
@ -369,6 +419,11 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
this.memberAddressInfos = memberAddressInfos;
}
@JustForTest
public MemberInfoReportTask getInfoReportTask() {
return infoReportTask;
}
public Map<String, Member> getServerList() {
return Collections.unmodifiableMap(serverList);
}

View File

@ -134,7 +134,7 @@ public class NacosClusterController {
@PostMapping(value = "/switch/lookup")
public RestResult<String> switchLookup(@RequestParam(name = "type") String type) {
try {
memberManager.swithLookup(type);
memberManager.switchLookup(type);
return RestResultUtils.success();
} catch (Throwable ex) {
return RestResultUtils.failed(ex.getMessage());

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.consistency.Config;
import com.alibaba.nacos.consistency.ap.APProtocol;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MemberUtils;
@ -62,6 +62,8 @@ public class ProtocolManager implements ApplicationListener<ContextStartedEvent>
private boolean cpInit = false;
private Set<Member> oldMembers;
private static Set<String> toAPMembersInfo(Collection<Member> members) {
Set<String> nodes = new HashSet<>();
members.forEach(member -> nodes.add(member.getAddress()));
@ -154,14 +156,20 @@ public class ProtocolManager implements ApplicationListener<ContextStartedEvent>
}
@Override
public void onEvent(MemberChangeEvent event) {
public void onEvent(MembersChangeEvent event) {
// Here, the sequence of node change events is very important. For example,
// node change event A occurs at time T1, and node change event B occurs at
// time T2 after a period of time.
// (T1 < T2)
Set<Member> copy = new HashSet<>(event.getMembers());
if (oldMembers == null) {
oldMembers = new HashSet<>(copy);
} else {
oldMembers.removeAll(copy);
}
if (!oldMembers.isEmpty()) {
// Node change events between different protocols should not block each other.
// and we use a single thread pool to inform the consistency layer of node changes,
// to avoid multiple tasks simultaneously carrying out the consistency layer of
@ -173,4 +181,9 @@ public class ProtocolManager implements ApplicationListener<ContextStartedEvent>
ProtocolExecutor.cpMemberChange(() -> cpProtocol.memberChange(toCPMembersInfo(copy)));
}
}
// remove old members info
oldMembers.clear();
oldMembers.addAll(copy);
}
}

View File

@ -18,7 +18,7 @@ package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.NodeState;
@ -87,7 +87,7 @@ public class ServerListManager implements MemberChangeListener {
*/
public boolean contains(String serverAddress) {
for (Member server : getServers()) {
if (Objects.equals(server, server.getAddress())) {
if (Objects.equals(serverAddress, server.getAddress())) {
return true;
}
}
@ -99,7 +99,7 @@ public class ServerListManager implements MemberChangeListener {
}
@Override
public void onEvent(MemberChangeEvent event) {
public void onEvent(MembersChangeEvent event) {
this.servers = new ArrayList<>(event.getMembers());
}

View File

@ -19,7 +19,7 @@ package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -67,6 +67,8 @@ public class RaftPeerSet implements MemberChangeListener {
private volatile boolean ready = false;
private Set<Member> oldMembers;
public RaftPeerSet(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
@ -299,8 +301,20 @@ public class RaftPeerSet implements MemberChangeListener {
}
@Override
public void onEvent(MemberChangeEvent event) {
changePeers(event.getMembers());
public void onEvent(MembersChangeEvent event) {
Collection<Member> members = event.getMembers();
if (oldMembers == null) {
oldMembers = new HashSet<>(members);
} else {
oldMembers.removeAll(members);
}
if (!oldMembers.isEmpty()) {
changePeers(members);
}
oldMembers.clear();
oldMembers.addAll(members);
}
private void changePeers(Collection<Member> members) {

View File

@ -17,8 +17,9 @@
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -129,14 +130,16 @@ public class DistroMapper implements MemberChangeListener {
}
@Override
public void onEvent(MemberChangeEvent event) {
public void onEvent(MembersChangeEvent event) {
// Here, the node list must be sorted to ensure that all nacos-server's
// node list is in the same order
List<String> list = MemberUtils.simpleMembers(event.getMembers());
List<String> list = MemberUtils.simpleMembers(MemberUtils
.selectTargetMembers(event.getMembers(), member -> !NodeState.DOWN.equals(member.getState())));
Collections.sort(list);
Collection<String> old = healthyList;
healthyList = Collections.unmodifiableList(list);
Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, old: {}, new: {}", old, healthyList);
old.clear();
}
@Override

View File

@ -17,16 +17,20 @@
package com.alibaba.nacos.test.core.cluster;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Constants;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
@ -36,13 +40,19 @@ import org.springframework.mock.web.MockServletContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Cluster node manages unit tests.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@ -50,19 +60,23 @@ public class ServerMemberManager_ITCase {
private ServerMemberManager memberManager;
{
try {
memberManager = new ServerMemberManager(new MockServletContext());
}
catch (Exception e) {
e.printStackTrace();
@BeforeClass
public static void initClass() throws Exception {
System.setProperty(Constants.NACOS_SERVER_IP, "127.0.0.1");
System.setProperty("server.port", "8847");
ApplicationUtils.setIsStandalone(true);
ApplicationUtils.injectEnvironment(new StandardEnvironment());
}
@AfterClass
public static void destroyClass() {
System.clearProperty(Constants.NACOS_SERVER_IP);
System.clearProperty("server.port");
}
@Before
public void init() throws Exception {
ApplicationUtils.setIsStandalone(true);
ApplicationUtils.injectEnvironment(new StandardEnvironment());
public void before() throws Exception {
memberManager = new ServerMemberManager(new MockServletContext());
}
@After
@ -71,23 +85,15 @@ public class ServerMemberManager_ITCase {
}
@Test
public void test_k_isFirst() {
public void testKisFirst() {
String firstIp = "127.0.0.1:8847";
String secondIp = "127.0.0.1:8847";
String thirdIp = "127.0.0.1:8847";
ConcurrentSkipListMap<String, Member> map = new ConcurrentSkipListMap<>();
map.put(secondIp, Member.builder()
.ip("127.0.0.1")
.port(8847)
.build());
map.put(firstIp, Member.builder()
.ip("127.0.0.1")
.port(8848)
.build());
map.put(thirdIp, Member.builder()
.ip("127.0.0.1")
.port(8849)
.build());
String secondIp = "127.0.0.1:8848";
String thirdIp = "127.0.0.1:8849";
Map<String, Member> map = new HashMap<>(4);
map.put(firstIp, Member.builder().ip("127.0.0.1").port(8847).state(NodeState.UP).build());
map.put(secondIp, Member.builder().ip("127.0.0.1").port(8848).state(NodeState.UP).build());
map.put(thirdIp, Member.builder().ip("127.0.0.1").port(8849).state(NodeState.UP).build());
List<Member> members = new ArrayList<Member>(map.values());
Collections.sort(members);
@ -97,21 +103,21 @@ public class ServerMemberManager_ITCase {
}
@Test
public void test_a_member_change() throws Exception {
public void testMemberChange() throws Exception {
AtomicInteger integer = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
NotifyCenter.registerSubscribe(new Subscribe<MemberChangeEvent>() {
NotifyCenter.registerSubscribe(new Subscribe<MembersChangeEvent>() {
@Override
public void onEvent(MemberChangeEvent event) {
public void onEvent(MembersChangeEvent event) {
integer.incrementAndGet();
latch.countDown();
}
@Override
public Class<? extends Event> subscribeType() {
return MemberChangeEvent.class;
return MembersChangeEvent.class;
}
});
Collection<Member> members = memberManager.allMembers();
@ -120,10 +126,7 @@ public class ServerMemberManager_ITCase {
memberManager.memberJoin(members);
members.add(Member.builder()
.ip("115.159.3.213")
.port(8848)
.build());
members.add(Member.builder().ip("115.159.3.213").port(8848).build());
boolean changed = memberManager.memberJoin(members);
Assert.assertTrue(changed);
@ -133,4 +136,58 @@ public class ServerMemberManager_ITCase {
Assert.assertEquals(1, integer.get());
}
@Test
public void testMemberHealthCheck() throws Exception {
AtomicReference<Collection<Member>> healthMembers = new AtomicReference<>();
CountDownLatch first = new CountDownLatch(1);
CountDownLatch second = new CountDownLatch(1);
NotifyCenter.registerSubscribe(new Subscribe<MembersChangeEvent>() {
@Override
public void onEvent(MembersChangeEvent event) {
System.out.println(event);
healthMembers.set(MemberUtils.selectTargetMembers(event.getMembers(), member -> !NodeState.DOWN.equals(member.getState())));
if (first.getCount() == 1) {
first.countDown();
return;
}
if (second.getCount() == 1) {
second.countDown();
}
}
@Override
public Class<? extends Event> subscribeType() {
return MembersChangeEvent.class;
}
});
String firstIp = "127.0.0.1:8847";
String secondIp = "127.0.0.1:8848";
String thirdIp = "127.0.0.1:8849";
Map<String, Member> map = new HashMap<>(4);
map.put(firstIp, Member.builder().ip("127.0.0.1").port(8847).state(NodeState.UP).build());
map.put(secondIp, Member.builder().ip("127.0.0.1").port(8848).state(NodeState.UP).build());
map.put(thirdIp, Member.builder().ip("127.0.0.1").port(8849).state(NodeState.UP).build());
Set<Member> firstMemberList = new HashSet<>(map.values());
memberManager.memberJoin(map.values());
first.await();
Set<Member> copy = new HashSet<>(firstMemberList);
copy.removeAll(healthMembers.get());
Assert.assertTrue(copy.isEmpty());
Member member = map.get(firstIp);
member.setState(NodeState.DOWN);
Assert.assertTrue(memberManager.update(member));
second.await();
copy = new HashSet<>(firstMemberList);
copy.removeAll(healthMembers.get());
Assert.assertEquals(1, copy.size());
Assert.assertTrue(copy.contains(map.get(firstIp)));
}
}