diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java b/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java index 98a9837b0..83040274f 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java @@ -137,7 +137,8 @@ public class Member implements Comparable, Cloneable { @Override public String toString() { - return "Member{" + "address='" + getAddress() + '\'' + '}'; + return "Member{" + "ip='" + ip + '\'' + ", port=" + port + ", state=" + state + ", extendInfo=" + extendInfo + + '}'; } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeListener.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeListener.java index 0b401441e..46b114d82 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeListener.java @@ -24,16 +24,16 @@ import com.alibaba.nacos.core.notify.listener.Subscribe; * * @author liaochuntao */ -public interface MemberChangeListener extends Subscribe { +public interface MemberChangeListener extends Subscribe { /** * return NodeChangeEvent.class info. * - * @return {@link MemberChangeEvent#getClass()} + * @return {@link MembersChangeEvent#getClass()} */ @Override default Class subscribeType() { - return MemberChangeEvent.class; + return MembersChangeEvent.class; } /** diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java index a0f77ea7f..b089d9c39 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Member node tool class. @@ -208,6 +209,10 @@ public class MemberUtils { return nodes; } + public static Set selectTargetMembers(Collection members, Predicate filter) { + return members.stream().filter(filter).collect(Collectors.toSet()); + } + public static List simpleMembers(Collection members) { return members.stream().map(Member::getAddress).sorted() .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MembersChangeEvent.java similarity index 83% rename from core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java rename to core/src/main/java/com/alibaba/nacos/core/cluster/MembersChangeEvent.java index 3f7e13ea7..ffc0d69e8 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberChangeEvent.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MembersChangeEvent.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; * * @author liaochuntao */ -public class MemberChangeEvent implements Event { +public class MembersChangeEvent implements Event { private static final AtomicLong SEQUENCE = new AtomicLong(0); @@ -59,6 +59,11 @@ public class MemberChangeEvent implements Event { return no; } + @Override + public String toString() { + return "MembersChangeEvent{" + "members=" + members + ", no=" + no + '}'; + } + public static final class MemberChangeEventBuilder { private Collection allMembers; @@ -74,12 +79,12 @@ public class MemberChangeEvent implements Event { /** * build MemberChangeEvent. * - * @return {@link MemberChangeEvent} + * @return {@link MembersChangeEvent} */ - public MemberChangeEvent build() { - MemberChangeEvent memberChangeEvent = new MemberChangeEvent(); - memberChangeEvent.setMembers(allMembers); - return memberChangeEvent; + public MembersChangeEvent build() { + MembersChangeEvent membersChangeEvent = new MembersChangeEvent(); + membersChangeEvent.setMembers(allMembers); + return membersChangeEvent; } } } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index a0bfa53d7..2fe02c285 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -56,7 +56,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.BiFunction; /** * Cluster node management in Nacos. @@ -77,53 +76,52 @@ import java.util.function.BiFunction; * @author liaochuntao */ @Component(value = "serverMemberManager") -@SuppressWarnings("all") public class ServerMemberManager implements ApplicationListener { private final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient(); /** - * Cluster node list + * Cluster node list. */ private volatile ConcurrentSkipListMap serverList; /** - * Is this node in the cluster list + * Is this node in the cluster list. */ private volatile boolean isInIpList = true; /** - * port + * port. */ private int port; /** - * Address information for the local node + * Address information for the local node. */ private String localAddress; /** - * Addressing pattern instances + * Addressing pattern instances. */ private MemberLookup lookup; /** - * self member obj + * self member obj. */ private volatile Member self; /** - * here is always the node information of the "UP" state + * here is always the node information of the "UP" state. */ private volatile Set memberAddressInfos = new ConcurrentHashSet<>(); /** - * Broadcast this node element information task + * Broadcast this node element information task. */ private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask(); public ServerMemberManager(ServletContext servletContext) throws Exception { - this.serverList = new ConcurrentSkipListMap(); + this.serverList = new ConcurrentSkipListMap<>(); ApplicationUtils.setContextPath(servletContext.getContextPath()); MemberUtils.setManager(this); @@ -156,14 +154,14 @@ public class ServerMemberManager implements ApplicationListener() { - @Override - public Member apply(String s, Member member) { - if (!NodeState.UP.equals(newMember.getState())) { - memberAddressInfos.remove(newMember.getAddress()); - } - MemberUtils.copy(newMember, member); - return member; + + serverList.computeIfPresent(address, (s, member) -> { + if (NodeState.DOWN.equals(newMember.getState())) { + memberAddressInfos.remove(newMember.getAddress()); } + MemberUtils.copy(newMember, member); + return member; }); + + // member data changes and all listeners need to be notified + NotifyCenter.publishEvent(MembersChangeEvent.builder() + .members(allMembers()) + .build()); + return true; } + /** + * Whether the node exists within the cluster. + * + * @param address ip:port + * @return is exist + */ public boolean hasMember(String address) { boolean result = serverList.containsKey(address); if (!result) { @@ -237,6 +251,11 @@ public class ServerMemberManager implements ApplicationListener allMembers() { // We need to do a copy to avoid affecting the real data HashSet set = new HashSet<>(serverList.values()); @@ -244,6 +263,11 @@ public class ServerMemberManager implements ApplicationListener allMembersWithoutSelf() { List members = new ArrayList<>(serverList.values()); members.remove(self); @@ -273,7 +297,7 @@ public class ServerMemberManager implements ApplicationListener tmpMap = new ConcurrentSkipListMap(); + ConcurrentSkipListMap tmpMap = new ConcurrentSkipListMap<>(); Set tmpAddressInfo = new ConcurrentHashSet<>(); for (Member member : members) { final String address = member.getAddress(); @@ -287,8 +311,8 @@ public class ServerMemberManager implements ApplicationListener oldList = serverList; + Set oldSet = memberAddressInfos; serverList = tmpMap; memberAddressInfos = tmpAddressInfo; @@ -305,25 +329,46 @@ public class ServerMemberManager implements ApplicationListener healthMembers = MemberUtils.selectTargetMembers(members, member -> { + return !NodeState.DOWN.equals(member.getState()); + }); + Event event = MembersChangeEvent.builder().members(finalMembers).build(); NotifyCenter.publishEvent(event); } return hasChange; } + /** + * members join this cluster. + * + * @param members {@link Collection} new members + * @return is success + */ public synchronized boolean memberJoin(Collection members) { Set set = new HashSet<>(members); set.addAll(allMembers()); return memberChange(set); } + /** + * members leave this cluster. + * + * @param members {@link Collection} wait leave members + * @return is success + */ public synchronized boolean memberLeave(Collection members) { Set set = new HashSet<>(allMembers()); set.removeAll(members); return memberChange(set); } + /** + * this member {@link Member#getState()} is health. + * + * @param address ip:port + * @return is health + */ public boolean isUnHealth(String address) { Member member = serverList.get(address); if (member == null) { @@ -347,6 +392,11 @@ public class ServerMemberManager implements ApplicationListener getServerList() { return Collections.unmodifiableMap(serverList); } diff --git a/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java b/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java index 16c810195..9e301bf2d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java +++ b/core/src/main/java/com/alibaba/nacos/core/controller/NacosClusterController.java @@ -134,7 +134,7 @@ public class NacosClusterController { @PostMapping(value = "/switch/lookup") public RestResult switchLookup(@RequestParam(name = "type") String type) { try { - memberManager.swithLookup(type); + memberManager.switchLookup(type); return RestResultUtils.success(); } catch (Throwable ex) { return RestResultUtils.failed(ex.getMessage()); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java index 0126a97d3..0469abacf 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java @@ -20,7 +20,7 @@ import com.alibaba.nacos.consistency.Config; import com.alibaba.nacos.consistency.ap.APProtocol; import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.core.cluster.Member; -import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; import com.alibaba.nacos.core.cluster.MemberChangeListener; import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; import com.alibaba.nacos.core.cluster.MemberUtils; @@ -62,6 +62,8 @@ public class ProtocolManager implements ApplicationListener private boolean cpInit = false; + private Set oldMembers; + private static Set toAPMembersInfo(Collection members) { Set nodes = new HashSet<>(); members.forEach(member -> nodes.add(member.getAddress())); @@ -154,23 +156,34 @@ public class ProtocolManager implements ApplicationListener } @Override - public void onEvent(MemberChangeEvent event) { + public void onEvent(MembersChangeEvent event) { // Here, the sequence of node change events is very important. For example, // node change event A occurs at time T1, and node change event B occurs at // time T2 after a period of time. // (T1 < T2) - Set copy = new HashSet<>(event.getMembers()); + + if (oldMembers == null) { + oldMembers = new HashSet<>(copy); + } else { + oldMembers.removeAll(copy); + } - // Node change events between different protocols should not block each other. - // and we use a single thread pool to inform the consistency layer of node changes, - // to avoid multiple tasks simultaneously carrying out the consistency layer of - // node changes operation - if (Objects.nonNull(apProtocol)) { - ProtocolExecutor.apMemberChange(() -> apProtocol.memberChange(toAPMembersInfo(copy))); - } - if (Objects.nonNull(cpProtocol)) { - ProtocolExecutor.cpMemberChange(() -> cpProtocol.memberChange(toCPMembersInfo(copy))); + if (!oldMembers.isEmpty()) { + // Node change events between different protocols should not block each other. + // and we use a single thread pool to inform the consistency layer of node changes, + // to avoid multiple tasks simultaneously carrying out the consistency layer of + // node changes operation + if (Objects.nonNull(apProtocol)) { + ProtocolExecutor.apMemberChange(() -> apProtocol.memberChange(toAPMembersInfo(copy))); + } + if (Objects.nonNull(cpProtocol)) { + ProtocolExecutor.cpMemberChange(() -> cpProtocol.memberChange(toCPMembersInfo(copy))); + } } + + // remove old members info + oldMembers.clear(); + oldMembers.addAll(copy); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index e8cc0f882..beb4ed27c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -71,7 +71,7 @@ public class WatchFileCenter { /** * Register {@link FileWatcher} in this directory. * - * @param paths directory + * @param paths directory * @param watcher {@link FileWatcher} * @return register is success * @throws NacosException NacosException @@ -131,7 +131,7 @@ public class WatchFileCenter { /** * Deregister {@link FileWatcher} in this directory. * - * @param path directory + * @param path directory * @param watcher {@link FileWatcher} * @return deregister is success */ diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java index 84bd4483c..86731038c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java @@ -18,7 +18,7 @@ package com.alibaba.nacos.naming.cluster; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.cluster.Member; -import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; import com.alibaba.nacos.core.cluster.MemberChangeListener; import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; import com.alibaba.nacos.core.cluster.NodeState; @@ -87,7 +87,7 @@ public class ServerListManager implements MemberChangeListener { */ public boolean contains(String serverAddress) { for (Member server : getServers()) { - if (Objects.equals(server, server.getAddress())) { + if (Objects.equals(serverAddress, server.getAddress())) { return true; } } @@ -99,7 +99,7 @@ public class ServerListManager implements MemberChangeListener { } @Override - public void onEvent(MemberChangeEvent event) { + public void onEvent(MembersChangeEvent event) { this.servers = new ArrayList<>(event.getMembers()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java index cf83507e4..b32eaee7e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java @@ -19,7 +19,7 @@ package com.alibaba.nacos.naming.consistency.persistent.raft; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberChangeListener; -import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -67,6 +67,8 @@ public class RaftPeerSet implements MemberChangeListener { private volatile boolean ready = false; + private Set oldMembers; + public RaftPeerSet(ServerMemberManager memberManager) { this.memberManager = memberManager; } @@ -299,8 +301,20 @@ public class RaftPeerSet implements MemberChangeListener { } @Override - public void onEvent(MemberChangeEvent event) { - changePeers(event.getMembers()); + public void onEvent(MembersChangeEvent event) { + Collection members = event.getMembers(); + if (oldMembers == null) { + oldMembers = new HashSet<>(members); + } else { + oldMembers.removeAll(members); + } + + if (!oldMembers.isEmpty()) { + changePeers(members); + } + + oldMembers.clear(); + oldMembers.addAll(members); } private void changePeers(Collection members) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java index 360fc36e6..1ec835db7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java @@ -17,8 +17,9 @@ package com.alibaba.nacos.naming.core; import com.alibaba.nacos.core.cluster.MemberChangeListener; -import com.alibaba.nacos.core.cluster.MemberChangeEvent; import com.alibaba.nacos.core.cluster.MemberUtils; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; +import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -129,14 +130,16 @@ public class DistroMapper implements MemberChangeListener { } @Override - public void onEvent(MemberChangeEvent event) { + public void onEvent(MembersChangeEvent event) { // Here, the node list must be sorted to ensure that all nacos-server's // node list is in the same order - List list = MemberUtils.simpleMembers(event.getMembers()); + List list = MemberUtils.simpleMembers(MemberUtils + .selectTargetMembers(event.getMembers(), member -> !NodeState.DOWN.equals(member.getState()))); Collections.sort(list); Collection old = healthyList; healthyList = Collections.unmodifiableList(list); Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, old: {}, new: {}", old, healthyList); + old.clear(); } @Override diff --git a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java index 068168379..b84fd6ce5 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/cluster/ServerMemberManager_ITCase.java @@ -17,16 +17,20 @@ package com.alibaba.nacos.test.core.cluster; import com.alibaba.nacos.core.cluster.Member; -import com.alibaba.nacos.core.cluster.MemberChangeEvent; +import com.alibaba.nacos.core.cluster.MembersChangeEvent; import com.alibaba.nacos.core.cluster.MemberUtils; +import com.alibaba.nacos.core.cluster.NodeState; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.notify.Event; import com.alibaba.nacos.core.notify.NotifyCenter; import com.alibaba.nacos.core.notify.listener.Subscribe; import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.core.utils.Constants; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -36,101 +40,154 @@ import org.springframework.mock.web.MockServletContext; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** + * Cluster node manages unit tests. + * * @author liaochuntao */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ServerMemberManager_ITCase { - - private ServerMemberManager memberManager; - - { - try { - memberManager = new ServerMemberManager(new MockServletContext()); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - @Before - public void init() throws Exception { - ApplicationUtils.setIsStandalone(true); - ApplicationUtils.injectEnvironment(new StandardEnvironment()); - } - - @After - public void after() throws Exception { - memberManager.shutdown(); - } - - @Test - public void test_k_isFirst() { - String firstIp = "127.0.0.1:8847"; - String secondIp = "127.0.0.1:8847"; - String thirdIp = "127.0.0.1:8847"; - ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); - map.put(secondIp, Member.builder() - .ip("127.0.0.1") - .port(8847) - .build()); - map.put(firstIp, Member.builder() - .ip("127.0.0.1") - .port(8848) - .build()); - map.put(thirdIp, Member.builder() - .ip("127.0.0.1") - .port(8849) - .build()); - - List members = new ArrayList(map.values()); - Collections.sort(members); - List ss = MemberUtils.simpleMembers(members); - - Assert.assertEquals(ss.get(0), members.get(0).getAddress()); - } - - @Test - public void test_a_member_change() throws Exception { - - AtomicInteger integer = new AtomicInteger(0); - CountDownLatch latch = new CountDownLatch(1); - - NotifyCenter.registerSubscribe(new Subscribe() { - @Override - public void onEvent(MemberChangeEvent event) { - integer.incrementAndGet(); - latch.countDown(); - } - - @Override - public Class subscribeType() { - return MemberChangeEvent.class; - } - }); - Collection members = memberManager.allMembers(); - - System.out.println(members); - - memberManager.memberJoin(members); - - members.add(Member.builder() - .ip("115.159.3.213") - .port(8848) - .build()); - - boolean changed = memberManager.memberJoin(members); - Assert.assertTrue(changed); - - latch.await(10_000L, TimeUnit.MILLISECONDS); - - Assert.assertEquals(1, integer.get()); - } - + + private ServerMemberManager memberManager; + + @BeforeClass + public static void initClass() throws Exception { + System.setProperty(Constants.NACOS_SERVER_IP, "127.0.0.1"); + System.setProperty("server.port", "8847"); + ApplicationUtils.setIsStandalone(true); + ApplicationUtils.injectEnvironment(new StandardEnvironment()); + } + + @AfterClass + public static void destroyClass() { + System.clearProperty(Constants.NACOS_SERVER_IP); + System.clearProperty("server.port"); + } + + @Before + public void before() throws Exception { + memberManager = new ServerMemberManager(new MockServletContext()); + } + + @After + public void after() throws Exception { + memberManager.shutdown(); + } + + @Test + public void testKisFirst() { + String firstIp = "127.0.0.1:8847"; + String secondIp = "127.0.0.1:8848"; + String thirdIp = "127.0.0.1:8849"; + + Map map = new HashMap<>(4); + map.put(firstIp, Member.builder().ip("127.0.0.1").port(8847).state(NodeState.UP).build()); + map.put(secondIp, Member.builder().ip("127.0.0.1").port(8848).state(NodeState.UP).build()); + map.put(thirdIp, Member.builder().ip("127.0.0.1").port(8849).state(NodeState.UP).build()); + + List members = new ArrayList(map.values()); + Collections.sort(members); + List ss = MemberUtils.simpleMembers(members); + + Assert.assertEquals(ss.get(0), members.get(0).getAddress()); + } + + @Test + public void testMemberChange() throws Exception { + + AtomicInteger integer = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + + NotifyCenter.registerSubscribe(new Subscribe() { + @Override + public void onEvent(MembersChangeEvent event) { + integer.incrementAndGet(); + latch.countDown(); + } + + @Override + public Class subscribeType() { + return MembersChangeEvent.class; + } + }); + Collection members = memberManager.allMembers(); + + System.out.println(members); + + memberManager.memberJoin(members); + + members.add(Member.builder().ip("115.159.3.213").port(8848).build()); + + boolean changed = memberManager.memberJoin(members); + Assert.assertTrue(changed); + + latch.await(10_000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(1, integer.get()); + } + + @Test + public void testMemberHealthCheck() throws Exception { + AtomicReference> healthMembers = new AtomicReference<>(); + CountDownLatch first = new CountDownLatch(1); + CountDownLatch second = new CountDownLatch(1); + NotifyCenter.registerSubscribe(new Subscribe() { + @Override + public void onEvent(MembersChangeEvent event) { + System.out.println(event); + healthMembers.set(MemberUtils.selectTargetMembers(event.getMembers(), member -> !NodeState.DOWN.equals(member.getState()))); + if (first.getCount() == 1) { + first.countDown(); + return; + } + if (second.getCount() == 1) { + second.countDown(); + } + } + + @Override + public Class subscribeType() { + return MembersChangeEvent.class; + } + }); + + String firstIp = "127.0.0.1:8847"; + String secondIp = "127.0.0.1:8848"; + String thirdIp = "127.0.0.1:8849"; + + Map map = new HashMap<>(4); + map.put(firstIp, Member.builder().ip("127.0.0.1").port(8847).state(NodeState.UP).build()); + map.put(secondIp, Member.builder().ip("127.0.0.1").port(8848).state(NodeState.UP).build()); + map.put(thirdIp, Member.builder().ip("127.0.0.1").port(8849).state(NodeState.UP).build()); + + Set firstMemberList = new HashSet<>(map.values()); + + memberManager.memberJoin(map.values()); + + first.await(); + Set copy = new HashSet<>(firstMemberList); + copy.removeAll(healthMembers.get()); + Assert.assertTrue(copy.isEmpty()); + + Member member = map.get(firstIp); + member.setState(NodeState.DOWN); + Assert.assertTrue(memberManager.update(member)); + + second.await(); + copy = new HashSet<>(firstMemberList); + copy.removeAll(healthMembers.get()); + Assert.assertEquals(1, copy.size()); + Assert.assertTrue(copy.contains(map.get(firstIp))); + } + }