#650 Add ServerListManager

This commit is contained in:
nkorange 2019-01-19 18:24:50 +08:00
parent d064fe36bb
commit bf80ff1622
17 changed files with 421 additions and 331 deletions

View File

@ -29,7 +29,6 @@ import org.springframework.stereotype.Component;
@Component("nacosApplicationContext") @Component("nacosApplicationContext")
public class SpringContext implements ApplicationContextAware { public class SpringContext implements ApplicationContextAware {
@Autowired
static ApplicationContext context; static ApplicationContext context;
@Override @Override

View File

@ -0,0 +1,167 @@
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.cluster.members.MemberChangeListener;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import static com.alibaba.nacos.common.util.SystemUtils.*;
/**
* The manager to globally refresh and operate server list.
*
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 1.0.0
*/
@Component("serverListManager")
public class ServerListManager {
private List<MemberChangeListener> listeners = new ArrayList<>();
private List<Member> members = new ArrayList<>();
private List<Member> reachableMembers = new ArrayList<>();
public void listen(MemberChangeListener listener) {
listeners.add(listener);
}
@PostConstruct
public void init() {
GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
}
private List<Member> refreshServerList() {
List<Member> result = new ArrayList<>();
if (STANDALONE_MODE) {
Member member = new Member();
member.setIp(NetUtils.getLocalAddress());
member.setServePort(RunningConfig.getServerPort());
result.add(member);
return result;
}
List<String> serverList = new ArrayList<>();
try {
serverList = readClusterConf();
} catch (Exception e) {
Loggers.SRV_LOG.warn("failed to get config: " + CLUSTER_CONF_FILE_PATH, e);
}
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("SERVER-LIST from cluster.conf: {}", result);
}
//use system env
if (CollectionUtils.isEmpty(serverList)) {
serverList = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV);
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("SERVER-LIST from system variable: {}", result);
}
}
if (CollectionUtils.isNotEmpty(serverList)) {
for (int i = 0; i < serverList.size(); i++) {
String ip;
int port;
if (serverList.get(0).contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
ip = serverList.get(i).split(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)[0];
port = Integer.parseInt(serverList.get(i).split(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)[1]);
} else {
ip = serverList.get(i);
port = RunningConfig.getServerPort();
}
Member member = new Member();
member.setIp(ip);
member.setServePort(port);
result.add(member);
}
}
return result;
}
public boolean contains(String server) {
for (Member member : members) {
if (member.getKey().equals(server)) {
return true;
}
}
return false;
}
public List<Member> getMembers() {
return members;
}
public List<Member> getReachableMembers() {
return reachableMembers;
}
private void notifyListeners() {
GlobalExecutor.submit(new Runnable() {
@Override
public void run() {
for (MemberChangeListener listener : listeners) {
listener.onChangeMemberList(members);
}
}
});
}
public class ServerListUpdater implements Runnable {
@Override
public void run() {
try {
List<Member> servers = refreshServerList();
List<Member> oldServers = members;
if (CollectionUtils.isEmpty(servers)) {
Loggers.RAFT.warn("refresh server list failed, ignore it.");
return;
}
boolean changed = false;
List<Member> newServers = (List<Member>) CollectionUtils.subtract(servers, oldServers);
if (CollectionUtils.isNotEmpty(newServers)) {
members.addAll(newServers);
changed = true;
Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers);
}
List<Member> deadServers = (List<Member>) CollectionUtils.subtract(oldServers, servers);
if (CollectionUtils.isNotEmpty(deadServers)) {
members.removeAll(deadServers);
changed = true;
Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers);
}
if (changed) {
notifyListeners();
}
} catch (Exception e) {
Loggers.RAFT.info("error while updating server list.", e);
}
}
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.nacos.naming.cluster.members;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
/**
* Member node of Nacos cluster
*
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 1.0.0
*/
public class Member {
/**
* IP of member
*/
private String ip;
/**
* serving port of member.
*/
private int servePort;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getServePort() {
return servePort;
}
public void setServePort(int servePort) {
this.servePort = servePort;
}
public String getKey() {
return ip + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + servePort;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Member member = (Member) o;
return servePort == member.servePort && ip.equals(member.ip);
}
@Override
public int hashCode() {
int result = ip.hashCode();
result = 31 * result + servePort;
return result;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,26 @@
package com.alibaba.nacos.naming.cluster.members;
import java.util.List;
/**
* Nacos cluster member change event listener
*
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @since 1.0.0
*/
public interface MemberChangeListener {
/**
* If member list changed, this method is invoked.
*
* @param latestMembers members after chang
*/
void onChangeMemberList(List<Member> latestMembers);
/**
* If reachable member list changed, this method is invoked.
*
* @param latestReachableMembers reachable members after change
*/
void onChangeReachableMemberList(List<Member> latestReachableMembers);
}

View File

@ -33,9 +33,10 @@ public class GlobalExecutor {
public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L); public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
public static final long ADDRESS_SERVER_UPDATE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L); private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { private static ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread t = new Thread(r); Thread t = new Thread(r);
@ -47,15 +48,23 @@ public class GlobalExecutor {
} }
}); });
public static void register(Runnable runnable) { public static void registerMasterElection(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
} }
public static void register1(Runnable runnable) { public static void registerServerListUpdater(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS);
}
public static void registerHeartbeat(Runnable runnable) {
executorService.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS); executorService.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
} }
public static void register(Runnable runnable, long delay) { public static void schedule(Runnable runnable, long delay) {
executorService.scheduleAtFixedRate(runnable, 0, delay, TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(runnable, 0, delay, TimeUnit.MILLISECONDS);
} }
public static void submit(Runnable runnable) {
executorService.submit(runnable);
}
} }

View File

@ -25,7 +25,6 @@ import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.ning.http.client.AsyncCompletionHandler; import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response; import com.ning.http.client.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.javatuples.Pair; import org.javatuples.Pair;
@ -57,8 +56,6 @@ public class RaftCore {
public static final String API_PUB = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/publish"; public static final String API_PUB = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/publish";
public static final String API_UNSF_PUB = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/unSafePublish";
public static final String API_DEL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/delete"; public static final String API_DEL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/delete";
public static final String API_GET = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/get"; public static final String API_GET = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/get";
@ -85,18 +82,14 @@ public class RaftCore {
public static final int PUBLISH_TERM_INCREASE_COUNT = 100; public static final int PUBLISH_TERM_INCREASE_COUNT = 100;
private static final int INIT_LOCK_TIME_SECONDS = 3;
private volatile boolean initialized = false; private volatile boolean initialized = false;
private static Lock lock = new ReentrantLock();
private volatile Map<String, List<DataListener>> listeners = new ConcurrentHashMap<>(); private volatile Map<String, List<DataListener>> listeners = new ConcurrentHashMap<>();
private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<String, Datum>(); private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<String, Datum>();
@Autowired @Autowired
private PeerSet peers; private RaftPeerSet peers;
@Autowired @Autowired
private SwitchDomain switchDomain; private SwitchDomain switchDomain;
@ -116,8 +109,6 @@ public class RaftCore {
executor.submit(notifier); executor.submit(notifier);
peers.add(NamingProxy.getServers());
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
ConcurrentMap<String, Datum> datumMap = raftStore.loadDatums(); ConcurrentMap<String, Datum> datumMap = raftStore.loadDatums();
@ -130,8 +121,7 @@ public class RaftCore {
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}", Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
peers.size(), datums.size(), peers.getTerm());
while (true) { while (true) {
if (notifier.tasks.size() <= 0) { if (notifier.tasks.size() <= 0) {
@ -142,18 +132,8 @@ public class RaftCore {
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.register(new MasterElection()); GlobalExecutor.registerMasterElection(new MasterElection());
GlobalExecutor.register1(new HeartBeat()); GlobalExecutor.registerHeartbeat(new HeartBeat());
GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);
if (peers.size() > 0) {
if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
initialized = true;
lock.unlock();
}
} else {
throw new Exception("peers is empty.");
}
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
@ -774,40 +754,6 @@ public class RaftCore {
return local; return local;
} }
public class AddressServerUpdater implements Runnable {
@Override
public void run() {
try {
List<String> servers = NamingProxy.getServers();
List<RaftPeer> peerList = new ArrayList<RaftPeer>(peers.allPeers());
List<String> oldServers = new ArrayList<String>();
if (CollectionUtils.isEmpty(servers)) {
Loggers.RAFT.warn("get empty server list from address server,ignore it.");
return;
}
for (RaftPeer peer : peerList) {
oldServers.add(peer.ip);
}
List<String> newServers = (List<String>) CollectionUtils.subtract(servers, oldServers);
if (!CollectionUtils.isEmpty(newServers)) {
peers.add(newServers);
Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers);
}
List<String> deadServers = (List<String>) CollectionUtils.subtract(oldServers, servers);
if (!CollectionUtils.isEmpty(deadServers)) {
peers.remove(deadServers);
Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers);
}
} catch (Exception e) {
Loggers.RAFT.info("error while updating server list.", e);
}
}
}
public void listen(String key, DataListener listener) { public void listen(String key, DataListener listener) {
List<DataListener> listenerList = listeners.get(key); List<DataListener> listenerList = listeners.get(key);
@ -847,7 +793,8 @@ public class RaftCore {
for (DataListener dl : listeners.get(key)) { for (DataListener dl : listeners.get(key)) {
// TODO maybe use equal: // TODO maybe use equal:
if (dl == listener) { if (dl == listener) {
listeners.remove(listener); listeners.get(key).remove(listener);
break;
} }
} }
} }
@ -891,11 +838,11 @@ public class RaftCore {
return new ArrayList<RaftPeer>(peers.allPeers()); return new ArrayList<RaftPeer>(peers.allPeers());
} }
public PeerSet getPeerSet() { public RaftPeerSet getPeerSet() {
return peers; return peers;
} }
public void setPeerSet(PeerSet peerSet) { public void setPeerSet(RaftPeerSet peerSet) {
peers = peerSet; peers = peerSet;
} }

View File

@ -34,7 +34,7 @@ public class RaftPeer {
public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS); public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS);
public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS); public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
public State state = State.FOLLOWER; public State state = State.FOLLOWER;

View File

@ -16,19 +16,29 @@
package com.alibaba.nacos.naming.consistency.persistent.simpleraft; package com.alibaba.nacos.naming.consistency.persistent.simpleraft;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.cluster.members.MemberChangeListener;
import com.alibaba.nacos.naming.misc.HttpClient; import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils; import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.ning.http.client.AsyncCompletionHandler; import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response; import com.ning.http.client.Response;
import org.apache.commons.collections.SortedBag; import org.apache.commons.collections.SortedBag;
import org.apache.commons.collections.bag.TreeBag; import org.apache.commons.collections.bag.TreeBag;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.PropertySource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import static com.alibaba.nacos.common.util.SystemUtils.STANDALONE_MODE; import static com.alibaba.nacos.common.util.SystemUtils.STANDALONE_MODE;
@ -36,7 +46,13 @@ import static com.alibaba.nacos.common.util.SystemUtils.STANDALONE_MODE;
* @author nacos * @author nacos
*/ */
@Component @Component
public class PeerSet { @DependsOn("serverListManager")
public class RaftPeerSet implements MemberChangeListener {
@Autowired
private ServerListManager serverListManager;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null; private RaftPeer leader = null;
@ -46,7 +62,13 @@ public class PeerSet {
private boolean ready = false; private boolean ready = false;
public PeerSet() { public RaftPeerSet() {
}
@PostConstruct
public void init() {
serverListManager.listen(this);
} }
public RaftPeer getLeader() { public RaftPeer getLeader() {
@ -199,6 +221,13 @@ public class PeerSet {
public RaftPeer local() { public RaftPeer local() {
RaftPeer peer = peers.get(NetUtils.localServer()); RaftPeer peer = peers.get(NetUtils.localServer());
if (peer == null && SystemUtils.STANDALONE_MODE) {
RaftPeer localPeer = new RaftPeer();
localPeer.ip = NetUtils.localServer();
localPeer.term.set(localTerm.get());
peers.put(localPeer.ip, localPeer);
return localPeer;
}
if (peer == null) { if (peer == null) {
throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: " throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: "
+ Arrays.toString(peers.keySet().toArray())); + Arrays.toString(peers.keySet().toArray()));
@ -232,13 +261,39 @@ public class PeerSet {
} }
local.term.set(term); local.term.set(term);
localTerm.set(term);
} }
public long getTerm() { public long getTerm() {
return local().term.get(); return localTerm.get();
} }
public boolean contains(RaftPeer remote) { public boolean contains(RaftPeer remote) {
return peers.containsKey(remote.ip); return peers.containsKey(remote.ip);
} }
@Override
public void onChangeMemberList(List<Member> latestMembers) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(8);
for (Member member : latestMembers) {
if (peers.containsKey(member.getKey())) {
tmpPeers.put(member.getKey(), peers.get(member.getKey()));
continue;
}
RaftPeer raftPeer = new RaftPeer();
raftPeer.ip = member.getKey();
tmpPeers.put(member.getKey(), raftPeer);
}
// replace raft peer set:
peers = tmpPeers;
}
@Override
public void onChangeReachableMemberList(List<Member> latestReachableMembers) {
}
} }

View File

@ -17,6 +17,8 @@ package com.alibaba.nacos.naming.core;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.misc.*;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@ -47,8 +49,6 @@ public class DistroMapper {
private Set<String> liveSites = new HashSet<String>(); private Set<String> liveSites = new HashSet<String>();
private String localhostIP;
public final String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; public final String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
private long lastHealthServerMillis = 0L; private long lastHealthServerMillis = 0L;
@ -60,46 +60,19 @@ public class DistroMapper {
@Autowired @Autowired
private SwitchDomain switchDomain; private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
/** /**
* init server list * init server list
*/ */
@PostConstruct @PostConstruct
public void init() { public void init() {
localhostIP = NetUtils.localServer();
List<String> servers = NamingProxy.getServers();
while (servers == null || servers.size() == 0) {
Loggers.SRV_LOG.warn("[DISTRO-MAPPER] Server list is empty, sleep 3 seconds and try again.");
try {
TimeUnit.SECONDS.sleep(3);
servers = NamingProxy.getServers();
} catch (InterruptedException e) {
Loggers.SRV_LOG.warn("[DISTRO-MAPPER] Sleeping thread is interupted, try again.");
}
}
StringBuilder sb = new StringBuilder();
for (String serverIP : servers) {
String serverSite;
String serverConfig;
serverSite = UtilsAndCommons.UNKNOWN_SITE;
serverConfig = serverSite + "#" + serverIP + "#" + System.currentTimeMillis() + "#" + 1 + "\r\n";
sb.append(serverConfig);
}
onServerStatusUpdate(sb.toString(), false);
UtilsAndCommons.SERVER_STATUS_EXECUTOR.schedule(new ServerStatusReporter(), UtilsAndCommons.SERVER_STATUS_EXECUTOR.schedule(new ServerStatusReporter(),
60000, TimeUnit.MILLISECONDS); 60000, TimeUnit.MILLISECONDS);
} }
private void onServerStatusUpdate(String configInfo, boolean isFromDiamond) { private void onServerStatusUpdate(String configInfo) {
String[] configs = configInfo.split("\r\n"); String[] configs = configInfo.split("\r\n");
if (configs.length == 0) { if (configs.length == 0) {
@ -214,7 +187,7 @@ public class DistroMapper {
server.ip = params[1]; server.ip = params[1];
server.lastRefTime = Long.parseLong(params[2]); server.lastRefTime = Long.parseLong(params[2]);
if (!NamingProxy.getServers().contains(server.ip)) { if (!serverListManager.contains(server.ip)) {
throw new IllegalArgumentException("ip: " + server.ip + " is not in serverlist"); throw new IllegalArgumentException("ip: " + server.ip + " is not in serverlist");
} }
@ -329,8 +302,8 @@ public class DistroMapper {
return false; return false;
} }
int index = healthyList.indexOf(localhostIP); int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(localhostIP); int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) { if (lastIndex < 0 || index < 0) {
return true; return true;
} }
@ -341,15 +314,15 @@ public class DistroMapper {
public String mapSrv(String dom) { public String mapSrv(String dom) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.distroEnabled) { if (CollectionUtils.isEmpty(healthyList) || !switchDomain.distroEnabled) {
return localhostIP; return NetUtils.localServer();
} }
try { try {
return healthyList.get(distroHash(dom) % healthyList.size()); return healthyList.get(distroHash(dom) % healthyList.size());
} catch (Exception e) { } catch (Exception e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + localhostIP, e); Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);
return localhostIP; return NetUtils.localServer();
} }
} }
@ -371,7 +344,7 @@ public class DistroMapper {
for (Map.Entry<String, List<Server>> entry : distroConfig.entrySet()) { for (Map.Entry<String, List<Server>> entry : distroConfig.entrySet()) {
for (Server server : entry.getValue()) { for (Server server : entry.getValue()) {
//request other server to clean invalid servers //request other server to clean invalid servers
if (!server.ip.equals(localhostIP)) { if (!server.ip.equals(NetUtils.localServer())) {
requestOtherServerCleanInvalidServers(server.ip); requestOtherServerCleanInvalidServers(server.ip);
} }
} }
@ -417,10 +390,6 @@ public class DistroMapper {
} }
} }
public String getLocalhostIP() {
return localhostIP;
}
@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") @SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
public static class Server { public static class Server {
public String site = UtilsAndCommons.UNKNOWN_SITE; public String site = UtilsAndCommons.UNKNOWN_SITE;
@ -478,35 +447,29 @@ public class DistroMapper {
weight = 1; weight = 1;
} }
localhostIP = NetUtils.localServer();
long curTime = System.currentTimeMillis(); long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + localhostIP + "#" + curTime + "#" + weight + "\r\n"; String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
//send status to itself //send status to itself
onReceiveServerStatus(status); onReceiveServerStatus(status);
List<String> allServers = NamingProxy.getServers(); List<Member> allServers = serverListManager.getMembers();
if (!allServers.contains(localhostIP)) { if (!serverListManager.contains(NetUtils.localServer())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers);
return; return;
} }
if (allServers.size() > 0 && !localhostIP.contains(UtilsAndCommons.LOCAL_HOST_IP)) { if (allServers.size() > 0 && !NetUtils.localServer().contains(UtilsAndCommons.LOCAL_HOST_IP)) {
for (String server : allServers) { for (Member server : allServers) {
if (server.equals(localhostIP)) { if (server.getKey().equals(NetUtils.localServer())) {
continue; continue;
} }
if (!allServers.contains(localhostIP)) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", localhostIP, allServers);
return;
}
Message msg = new Message(); Message msg = new Message();
msg.setData(status); msg.setData(status);
synchronizer.send(server, msg); synchronizer.send(server.getKey(), msg);
} }
} }

View File

@ -20,6 +20,8 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.consistency.ConsistencyService; import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.consistency.DataListener; import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.Datum; import com.alibaba.nacos.naming.consistency.persistent.simpleraft.Datum;
@ -75,6 +77,9 @@ public class ServiceManager implements DataListener {
@Autowired @Autowired
private DistroMapper distroMapper; private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired @Autowired
private PushService pushService; private PushService pushService;
@ -94,14 +99,6 @@ public class ServiceManager implements DataListener {
@PostConstruct @PostConstruct
public void init() { public void init() {
// wait until distro-mapper ready because domain distribution check depends on it
// TODO may be not necessary:
while (distroMapper.getLiveSites().size() == 0) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
} catch (InterruptedException ignore) {
}
}
UtilsAndCommons.DOMAIN_SYNCHRONIZATION_EXECUTOR.schedule(new DomainReporter(), 60000, TimeUnit.MILLISECONDS); UtilsAndCommons.DOMAIN_SYNCHRONIZATION_EXECUTOR.schedule(new DomainReporter(), 60000, TimeUnit.MILLISECONDS);
@ -119,7 +116,6 @@ public class ServiceManager implements DataListener {
return serviceMap.get(namespaceId); return serviceMap.get(namespaceId);
} }
public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) { public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) {
lock.lock(); lock.lock();
try { try {
@ -171,7 +167,7 @@ public class ServiceManager implements DataListener {
addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName())); addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName()));
putDomain(dom); putDomain(dom);
dom.init(); dom.init();
consistencyService.listen(UtilsAndCommons.getDomStoreKey(dom), dom); consistencyService.listen(UtilsAndCommons.getIPListStoreKey(dom), dom);
Loggers.SRV_LOG.info("[NEW-DOM-RAFT] {}", dom.toJSON()); Loggers.SRV_LOG.info("[NEW-DOM-RAFT] {}", dom.toJSON());
} }
wakeUp(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName())); wakeUp(UtilsAndCommons.assembleFullServiceName(dom.getNamespaceId(), dom.getName()));
@ -695,17 +691,17 @@ public class ServiceManager implements DataListener {
msg.setData(JSON.toJSONString(checksum)); msg.setData(JSON.toJSONString(checksum));
List<String> sameSiteServers = NamingProxy.getSameSiteServers().get("sameSite"); List<Member> sameSiteServers = serverListManager.getMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0 || !NamingProxy.getServers().contains(NetUtils.localServer())) { if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return; return;
} }
for (String server : sameSiteServers) { for (Member server : sameSiteServers) {
if (server.equals(NetUtils.localServer())) { if (server.getKey().equals(NetUtils.localServer())) {
continue; continue;
} }
synchronizer.send(server, msg); synchronizer.send(server.getKey(), msg);
} }
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -17,6 +17,8 @@ package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.core.Cluster; import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.IpAddress; import com.alibaba.nacos.naming.core.IpAddress;
@ -48,6 +50,9 @@ public class HealthCheckCommon {
@Autowired @Autowired
private SwitchDomain switchDomain; private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
@Autowired @Autowired
private PushService pushService; private PushService pushService;
@ -71,14 +76,14 @@ public class HealthCheckCommon {
List list = Arrays.asList(healthCheckResults.toArray()); List list = Arrays.asList(healthCheckResults.toArray());
healthCheckResults.clear(); healthCheckResults.clear();
List<String> sameSiteServers = NamingProxy.getSameSiteServers().get("sameSite"); List<Member> sameSiteServers = serverListManager.getMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0 || !NamingProxy.getServers().contains(NetUtils.localServer())) { if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return; return;
} }
for (String server : sameSiteServers) { for (Member server : sameSiteServers) {
if (server.equals(NetUtils.localServer())) { if (server.getKey().equals(NetUtils.localServer())) {
continue; continue;
} }
Map<String, String> params = new HashMap<>(10); Map<String, String> params = new HashMap<>(10);
@ -87,10 +92,8 @@ public class HealthCheckCommon {
Loggers.DEBUG_LOG.debug("[HEALTH-SYNC] server: {}, healthCheckResults: {}", Loggers.DEBUG_LOG.debug("[HEALTH-SYNC] server: {}, healthCheckResults: {}",
server, JSON.toJSONString(list)); server, JSON.toJSONString(list));
} }
if (!server.contains(":")) {
server = server + ":" + RunningConfig.getServerPort(); HttpClient.HttpResult httpResult = HttpClient.httpPost("http://" + server.getKey()
}
HttpClient.HttpResult httpResult = HttpClient.httpPost("http://" + server
+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/api/healthCheckResult", null, params); + "/api/healthCheckResult", null, params);

View File

@ -36,150 +36,6 @@ import static com.alibaba.nacos.common.util.SystemUtils.*;
*/ */
public class NamingProxy { public class NamingProxy {
private static volatile List<String> servers;
private static List<String> serverlistFromConfig;
private static List<String> lastServers = new ArrayList<String>();
private static Map<String, List<String>> serverListMap = new ConcurrentHashMap<String, List<String>>();
private static long lastSrvRefTime = 0L;
/**
* records last time that query site info of servers and localhost from armory
*/
private static long lastSrvSiteRefreshTime = 0L;
private static long VIP_SRV_REF_INTER_MILLIS = TimeUnit.SECONDS.toMillis(30);
/**
* query site info of servers and localhost every 12 hours
*/
private static final long VIP_SRV_SITE_REF_INTER_MILLIS = TimeUnit.HOURS.toMillis(1);
private static String jmenv;
public static String getJmenv() {
jmenv = SystemUtils.getSystemEnv("nacos_jmenv_domain");
if (StringUtils.isEmpty(jmenv)) {
jmenv = System.getProperty("com.alibaba.nacos.naming.jmenv", "jmenv.tbsite.net");
}
if (StringUtils.isEmpty(jmenv)) {
jmenv = "jmenv.tbsite.net";
}
return jmenv;
}
private static void refreshSrvSiteIfNeed() {
refreshSrvIfNeed();
try {
if (System.currentTimeMillis() - lastSrvSiteRefreshTime > VIP_SRV_SITE_REF_INTER_MILLIS ||
!CollectionUtils.isEqualCollection(servers, lastServers)) {
if (!CollectionUtils.isEqualCollection(servers, lastServers)) {
Loggers.SRV_LOG.info("[REFRESH-SERVER-SITE] server list is changed, old: {}, new: {}",
lastServers, servers);
}
lastServers = servers;
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("fail to query server site: ", e);
}
}
public static List<String> getServers() {
refreshSrvIfNeed();
return servers;
}
public static void refreshSrvIfNeed() {
try {
if (System.currentTimeMillis() - lastSrvRefTime < VIP_SRV_REF_INTER_MILLIS) {
return;
}
if (STANDALONE_MODE) {
servers = new ArrayList<>();
servers.add(NetUtils.localServer());
return;
}
List<String> serverlist = refreshServerListFromDisk();
if (CollectionUtils.isNotEmpty(serverlist)) {
serverlistFromConfig = serverlist;
}
if (CollectionUtils.isNotEmpty(serverlistFromConfig)) {
servers = serverlistFromConfig;
}
if (RunningConfig.getServerPort() > 0) {
lastSrvRefTime = System.currentTimeMillis();
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("failed to update server list", e);
}
}
public static List<String> refreshServerListFromDisk() {
List<String> result = new ArrayList<>();
// read nacos config if necessary.
try {
result = readClusterConf();
} catch (Exception e) {
Loggers.SRV_LOG.warn("failed to get config: " + CLUSTER_CONF_FILE_PATH, e);
}
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("REFRESH-SERVER-LIST1 {}", result);
}
//use system env
if (CollectionUtils.isEmpty(result)) {
result = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV);
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("REFRESH-SERVER-LIST4: {}", result);
}
}
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("REFRESH-SERVER-LIST2 {}", result);
}
if (!result.isEmpty() && !result.get(0).contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
for (int i = 0; i < result.size(); i++) {
result.set(i, result.get(i) + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort());
}
}
return result;
}
/**
* This method will classify all servers as two kinds of servers: servers in the same site with local host and others
*
* @return servers
*/
public static ConcurrentHashMap<String, List<String>> getSameSiteServers() {
refreshSrvSiteIfNeed();
List<String> snapshot = servers;
ConcurrentHashMap<String, List<String>> servers = new ConcurrentHashMap<>(2);
servers.put("sameSite", snapshot);
servers.put("otherSite", new ArrayList<String>());
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("sameSiteServers: {}", servers.toString());
}
return servers;
}
public static String reqAPI(String api, Map<String, String> params, String curServer, boolean isPost) throws Exception { public static String reqAPI(String api, Map<String, String> params, String curServer, boolean isPost) throws Exception {
try { try {
List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION, List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION,
@ -219,29 +75,4 @@ public class NamingProxy {
} }
return StringUtils.EMPTY; return StringUtils.EMPTY;
} }
public static String getEnv() {
try {
String urlString = "http://" + getJmenv() + ":8080" + "/env";
List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive");
HttpClient.HttpResult result = HttpClient.httpGet(urlString, headers, null);
if (HttpURLConnection.HTTP_OK != result.code) {
throw new IOException("Error while requesting: " + urlString + "'. Server returned: "
+ result.code);
}
String content = result.content;
return content.trim();
} catch (Exception e) {
Loggers.SRV_LOG.warn("failed to get env", e);
}
return "sh";
}
} }

View File

@ -30,10 +30,11 @@ public class NetUtils {
private static String serverAddress = null; private static String serverAddress = null;
public static String localServer() { public static String getLocalAddress() {
try { try {
if (StringUtils.isNotBlank(serverAddress)) { if (StringUtils.isNotBlank(serverAddress)) {
return serverAddress + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort(); return serverAddress;
} }
InetAddress inetAddress = InetAddress.getLocalHost(); InetAddress inetAddress = InetAddress.getLocalHost();
@ -45,12 +46,16 @@ public class NetUtils {
serverAddress = inetAddress.getCanonicalHostName(); serverAddress = inetAddress.getCanonicalHostName();
} }
} }
return serverAddress + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort(); return serverAddress;
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
return "resolve_failed"; return "resolve_failed";
} }
} }
public static String localServer() {
return getLocalAddress() + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
public static String num2ip(int ip) { public static String num2ip(int ip) {
int[] b = new int[4]; int[] b = new int[4];
String x = ""; String x = "";

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.common.util.Md5Utils;
import com.alibaba.nacos.common.util.SystemUtils; import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.core.utils.WebUtils; import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.core.*; import com.alibaba.nacos.naming.core.*;
import com.alibaba.nacos.naming.exception.NacosException; import com.alibaba.nacos.naming.exception.NacosException;
import com.alibaba.nacos.naming.healthcheck.*; import com.alibaba.nacos.naming.healthcheck.*;
@ -73,6 +74,9 @@ public class ApiCommands {
@Autowired @Autowired
private SwitchManager switchManager; private SwitchManager switchManager;
@Autowired
private ServerListManager serverListManager;
@Autowired @Autowired
private SwitchDomain switchDomain; private SwitchDomain switchDomain;
@ -913,8 +917,8 @@ public class ApiCommands {
@ResponseBody @ResponseBody
public JSONObject srvIPXT(HttpServletRequest request) throws Exception { public JSONObject srvIPXT(HttpServletRequest request) throws Exception {
if (distroMapper.getLocalhostIP().equals(UtilsAndCommons.LOCAL_HOST_IP)) { if (NetUtils.localServer().equals(UtilsAndCommons.LOCAL_HOST_IP)) {
throw new Exception("invalid localhost ip: " + distroMapper.getLocalhostIP()); throw new Exception("invalid localhost ip: " + NetUtils.localServer());
} }
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID, String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
@ -1456,7 +1460,7 @@ public class ApiCommands {
String domsStatusString = WebUtils.required(request, "domsStatus"); String domsStatusString = WebUtils.required(request, "domsStatus");
String serverIP = WebUtils.optional(request, "clientIP", ""); String serverIP = WebUtils.optional(request, "clientIP", "");
if (!NamingProxy.getServers().contains(serverIP)) { if (!serverListManager.contains(serverIP)) {
throw new IllegalArgumentException("ip: " + serverIP + " is not in serverlist"); throw new IllegalArgumentException("ip: " + serverIP + " is not in serverlist");
} }

View File

@ -8,3 +8,20 @@ spring.datasource.max-wait=10000
spring.datasource.max-active=15 spring.datasource.max-active=15
## Validate the connection before borrowing it from the pool. ## Validate the connection before borrowing it from the pool.
#spring.datasource.test-on-borrow=true #spring.datasource.test-on-borrow=true
management.metrics.export.elastic.enabled=false
#management.metrics.export.elastic.host=http://localhost:9200
# metrics for influx
management.metrics.export.influx.enabled=false
#management.metrics.export.influx.db=springboot
#management.metrics.export.influx.uri=http://localhost:8086
#management.metrics.export.influx.auto-create-db=true
#management.metrics.export.influx.consistency=one
#management.metrics.export.influx.compressed=true
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
server.tomcat.basedir=

View File

@ -15,7 +15,7 @@
*/ */
package com.alibaba.nacos.naming; package com.alibaba.nacos.naming;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.PeerSet; import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftPeerSet;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftCore; import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftPeer; import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftPeer;
import com.alibaba.nacos.naming.core.ServiceManager; import com.alibaba.nacos.naming.core.ServiceManager;
@ -34,7 +34,7 @@ public class BaseTest {
public ServiceManager domainsManager; public ServiceManager domainsManager;
@Mock @Mock
public PeerSet peerSet; public RaftPeerSet peerSet;
@Mock @Mock
public RaftCore raftCore; public RaftCore raftCore;

View File

@ -19,7 +19,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.naming.BaseTest; import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.consistency.persistent.simpleraft.PeerSet; import com.alibaba.nacos.naming.consistency.persistent.simpleraft.RaftPeerSet;
import com.alibaba.nacos.naming.core.Cluster; import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.IpAddress; import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain; import com.alibaba.nacos.naming.core.VirtualClusterDomain;
@ -57,7 +57,7 @@ public class InstanceControllerTest extends BaseTest {
private InstanceController instanceController; private InstanceController instanceController;
@Mock @Mock
private PeerSet peerSet; private RaftPeerSet peerSet;
private MockMvc mockmvc; private MockMvc mockmvc;