This commit is contained in:
nkorange 2019-01-30 17:29:34 +08:00
parent d3a4834672
commit 4c03773973
16 changed files with 68 additions and 53 deletions

View File

@ -139,7 +139,7 @@ public class ServerListManager {
public void run() {
for (ServerChangeListener listener : listeners) {
listener.onChangeServerList(servers);
listener.onChangeHealthServerList(healthyServers);
listener.onChangeHealthyServerList(healthyServers);
}
}
});

View File

@ -13,14 +13,14 @@ public interface ServerChangeListener {
/**
* If member list changed, this method is invoked.
*
* @param latestMembers servers after chang
* @param servers servers after change
*/
void onChangeServerList(List<Server> latestMembers);
void onChangeServerList(List<Server> servers);
/**
* If reachable member list changed, this method is invoked.
*
* @param latestReachableMembers reachable servers after change
* @param healthyServer reachable servers after change
*/
void onChangeHealthServerList(List<Server> latestReachableMembers);
void onChangeHealthyServerList(List<Server> healthyServer);
}

View File

@ -87,13 +87,11 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Override
public boolean isResponsible(String key) {
// TODO convert key to service name:
return distroMapper.responsible(key);
return distroMapper.responsible(KeyBuilder.getServiceName(key));
}
@Override
public String getResponsibleServer(String key) {
// TODO convert key to service name:
return distroMapper.mapSrv(key);
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
@ -71,15 +72,23 @@ public class DataSyncer implements ServerChangeListener {
public void submit(SyncTask task) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
iterator.remove();
// If it's a new task:
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
GlobalExecutor.submitDataSync(new Runnable() {
@Override
public void run() {
@ -92,8 +101,18 @@ public class DataSyncer implements ServerChangeListener {
List<String> keys = task.getKeys();
Loggers.EPHEMERAL.info("sync keys: {}", keys);
Map<String, Datum<?>> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
@ -130,14 +149,19 @@ public class DataSyncer implements ServerChangeListener {
Map<String, Long> keyTimestamps = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(key)) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
// this key is no longer in our hands:
Loggers.EPHEMERAL.warn("remove key: {}", key);
dataStore.remove(key);
continue;
}
keyTimestamps.put(key, dataStore.get(key).timestamp.get());
}
if (keyTimestamps.isEmpty()) {
return;
}
for (Server member : servers) {
NamingProxy.syncTimestamps(keyTimestamps, member.getKey());
}
@ -158,7 +182,7 @@ public class DataSyncer implements ServerChangeListener {
}
@Override
public void onChangeHealthServerList(List<Server> healthServers) {
public void onChangeHealthyServerList(List<Server> healthServers) {
servers = healthServers;
}
}

View File

@ -196,11 +196,11 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Override
public boolean isResponsible(String key) {
return distroMapper.responsible(key);
return distroMapper.responsible(KeyBuilder.getServiceName(key));
}
@Override
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(key);
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
}

View File

@ -276,7 +276,7 @@ public class RaftPeerSet implements ServerChangeListener {
}
@Override
public void onChangeHealthServerList(List<Server> latestReachableMembers) {
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
}
}

View File

@ -257,7 +257,7 @@ public class InstanceController {
}
@RequestMapping("/listWithHealthStatus")
@RequestMapping("/instance/listWithHealthStatus")
public JSONObject listWithHealthStatus(HttpServletRequest request) throws NacosException {
String key = WebUtils.required(request, "key");

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsist
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -55,6 +56,9 @@ public class PartitionController {
@RequestMapping("/onSync")
public String onSync(HttpServletRequest request, HttpServletResponse response) throws Exception {
byte[] data = IoUtils.tryDecompress(request.getInputStream());
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
Map<String, Datum<Instances>> dataMap =
serializer.deserializeMap(data, Instances.class);

View File

@ -63,8 +63,6 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
@JSONField(serialize = false)
private Service dom;
private Map<String, Boolean> ipContains = new ConcurrentHashMap<>();
private Map<String, String> metadata = new ConcurrentHashMap<>();
public Cluster() {

View File

@ -19,7 +19,6 @@ import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.collections.CollectionUtils;
@ -27,7 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author nkorange
@ -62,7 +62,7 @@ public class DistroMapper implements ServerChangeListener {
&& cluster.contains(instance);
}
public boolean responsible(String dom) {
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled()) {
return true;
}
@ -78,7 +78,7 @@ public class DistroMapper implements ServerChangeListener {
return true;
}
int target = distroHash(dom) % healthyList.size();
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
@ -106,7 +106,7 @@ public class DistroMapper implements ServerChangeListener {
}
@Override
public void onChangeHealthServerList(List<Server> latestReachableMembers) {
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {

View File

@ -109,7 +109,6 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
this.enabled = enabled;
}
public long getLastModifiedMillis() {
return lastModifiedMillis;
}

View File

@ -194,8 +194,7 @@ public class ServiceManager implements DataListener<Service> {
}
private class UpdatedDomainProcessor implements Runnable {
//get changed domain from other server asynchronously
//get changed domain from other server asynchronously
@Override
public void run() {
String domName = null;
@ -262,13 +261,13 @@ public class ServiceManager implements DataListener<Service> {
ipsMap.put(strings[0], strings[1]);
}
Service raftService = (Service) getService(namespaceId, domName);
Service service = getService(namespaceId, domName);
if (raftService == null) {
if (service == null) {
return;
}
List<Instance> instances = raftService.allIPs();
List<Instance> instances = service.allIPs();
for (Instance instance : instances) {
Boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));
@ -280,14 +279,14 @@ public class ServiceManager implements DataListener<Service> {
}
}
pushService.domChanged(raftService.getNamespaceId(), raftService.getName());
pushService.domChanged(service.getNamespaceId(), service.getName());
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = raftService.allIPs();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isValid()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] dom: {}, ips: {}", raftService.getName(), stringBuilder.toString());
Loggers.EVT_LOG.info("[IP-UPDATED] dom: {}, ips: {}", service.getName(), stringBuilder.toString());
}

View File

@ -30,11 +30,11 @@ import java.util.*;
*/
public class NamingProxy {
private static final String DATA_ON_SYNC_URL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/partition/onSync";
private static final String DATA_ON_SYNC_URL = "/partition/onSync";
private static final String DATA_GET_URL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/partition/get";
private static final String DATA_GET_URL = "/partition/get";
private static final String TIMESTAMP_SYNC_URL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/partition/syncTimestamps";
private static final String TIMESTAMP_SYNC_URL = "/partition/syncTimestamps";
public static void syncTimestamps(Map<String, Long> timestamps, String server) {
@ -144,9 +144,7 @@ public class NamingProxy {
return StringUtils.EMPTY;
}
throw new IOException("failed to req API:" + "http://" + curServer
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api + ". code:"
throw new IOException("failed to req API:" + "http://" + curServer + api + ". code:"
+ result.code + " msg: " + result.content);
} catch (Exception e) {
Loggers.SRV_LOG.warn("NamingProxy", e);

View File

@ -42,7 +42,7 @@ public class ServerStatusSynchronizer implements Synchronizer {
if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
url = "http://" + serverIP + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/api/serverStatus";
+ "/operator/serverStatus";
}
try {

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.selector;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.naming.core.Instance;
@ -26,7 +27,7 @@ import java.util.List;
* @author nkorange
* @since 0.7.0
*/
public class NoneSelector extends com.alibaba.nacos.api.selector.AbstractSelector implements Selector {
public class NoneSelector extends AbstractSelector implements Selector {
public NoneSelector() {
this.setType(SelectorType.none.name());

View File

@ -16,17 +16,14 @@
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import javax.annotation.Resource;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -38,9 +35,6 @@ import java.util.Map;
*/
public class DistroFilter implements Filter {
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private DistroMapper distroMapper;
@ -74,9 +68,9 @@ public class DistroFilter implements Filter {
String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
if (StringUtils.isNoneBlank(serviceName) && !HttpMethod.GET.name().equals(req.getMethod())
&& !consistencyService.isResponsible(serviceName)) {
&& !distroMapper.responsible(serviceName)) {
String url = "http://" + consistencyService.getResponsibleServer(serviceName) +
String url = "http://" + distroMapper.mapSrv(serviceName) +
req.getRequestURI() + "?" + req.getQueryString();
try {
resp.sendRedirect(url);