#744 Add server status, #629 Add partition service warm up logic

This commit is contained in:
nkorange 2019-02-09 15:43:03 +08:00
parent cba85f0635
commit 58b8d1640f
38 changed files with 608 additions and 291 deletions

View File

@ -22,8 +22,6 @@ package com.alibaba.nacos.api.common;
*/
public class Constants {
public static final String CLIENT_VERSION_HEADER = "Client-Version";
public static final String CLIENT_VERSION = "3.0.0";
public static int DATA_IN_BODY_VERSION = 204;

View File

@ -101,6 +101,7 @@ public class NamingProxy {
String urlString = "http://" + endpoint + "/nacos/serverlist";
List<String> headers = Arrays.asList("Client-Version", UtilAndComs.VERSION,
"User-Agent", UtilAndComs.VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"RequestId", UuidUtils.generateUuid());
@ -301,6 +302,7 @@ public class NamingProxy {
long end = 0;
List<String> headers = Arrays.asList("Client-Version", UtilAndComs.VERSION,
"User-Agent", UtilAndComs.VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"RequestId", UuidUtils.generateUuid());

View File

@ -56,6 +56,11 @@ public class AuthChecker {
return;
}
agent = req.getHeader("User-Agent");
if (StringUtils.startsWith(agent, UtilsAndCommons.NACOS_SERVER_HEADER)) {
return;
}
throw new IllegalAccessException("illegal access,agent= " + agent + ", token=" + token);
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;
/**
* A flag to indicate the exact status of a server.
*
* @author nkorange
* @since 1.0.0
*/
public enum ServerStatus {
/**
* server is up and ready for request
*/
UP,
/**
* server is out of service, something abnormal happened
*/
DOWN,
/**
* server is preparing itself for request, usually 'UP' is the next status
*/
STARTING,
/**
* server is manually paused
*/
PAUSED,
/**
* only write operation is permitted.
*/
WRITE_ONLY,
/**
* only read operation is permitted.
*/
READY_ONLY
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* Detect and control the working status of local server
*
* @author nkorange
* @since 1.0.0
*/
@Service
public class ServerStatusManager {
@Autowired
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
private ServerStatus serverStatus = ServerStatus.STARTING;
private boolean serverStatusLocked = false;
@PostConstruct
public void init() {
GlobalExecutor.registerServerStatusUpdater(new ServerStatusUpdater());
}
private void refreshServerStatus() {
if (StringUtils.isNotBlank(switchDomain.getOverriddenServerStatus())) {
serverStatus = ServerStatus.valueOf(switchDomain.getOverriddenServerStatus());
return;
}
if (consistencyService.isAvailable()) {
serverStatus = ServerStatus.UP;
} else {
serverStatus = ServerStatus.DOWN;
}
}
public ServerStatus getServerStatus() {
return serverStatus;
}
public class ServerStatusUpdater implements Runnable {
@Override
public void run() {
refreshServerStatus();
}
}
}

View File

@ -93,4 +93,11 @@ public interface ConsistencyService {
* @return responsible server for the data
*/
String getResponsibleServer(String key);
/**
* Tell the status of this consistency service
*
* @return true if available
*/
boolean isAvailable();
}

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyServic
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* Publish execution delegate
@ -28,7 +28,7 @@ import org.springframework.stereotype.Component;
* @author nkorange
* @since 1.0.0
*/
@Component("consistencyDelegate")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Autowired
@ -94,4 +94,9 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
@Override
public boolean isAvailable() {
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();
}
}

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
@ -29,12 +30,17 @@ import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.commons.lang3.CharEncoding.UTF_8;
/**
* Data replicator
*
@ -48,6 +54,9 @@ public class DataSyncer implements ServerChangeListener {
@Autowired
private DataStore dataStore;
@Autowired
private PartitionConfig partitionConfig;
@Autowired
private Serializer serializer;
@ -61,6 +70,8 @@ public class DataSyncer implements ServerChangeListener {
private List<Server> servers;
private boolean initialized = false;
@PostConstruct
public void init() {
serverListManager.listen(this);
@ -144,6 +155,31 @@ public class DataSyncer implements ServerChangeListener {
@Override
public void run() {
try {
File metaFile = new File(UtilsAndCommons.DATA_BASE_DIR + File.separator + "ephemeral.properties");
if (initialized) {
// write the current instance count to disk:
IoUtils.writeStringToFile(metaFile, "instanceCount=" + dataStore.keys().size(), "UTF-8");
} else {
// check if most of the data are loaded:
List<String> lines = IoUtils.readLines(new InputStreamReader(new FileInputStream(metaFile), UTF_8));
if (lines == null || lines.isEmpty()) {
initialized = true;
} else {
int desiredInstanceCount = Integer.parseInt(lines.get(0).split("=")[1]);
if (desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) {
initialized = true;
}
}
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("operate on meta file failed.", e);
}
try {
// send local timestamps to other servers:
Map<String, Long> keyTimestamps = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
@ -162,6 +198,9 @@ public class DataSyncer implements ServerChangeListener {
}
NamingProxy.syncTimestamps(keyTimestamps, member.getKey());
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("timed sync task failed.", e);
}
}
}
@ -173,6 +212,10 @@ public class DataSyncer implements ServerChangeListener {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
public boolean isInitialized() {
return initialized;
}
@Override
public void onChangeServerList(List<Server> latestMembers) {

View File

@ -33,6 +33,9 @@ public class PartitionConfig {
@Value("${nacos.naming.partition.batchSyncKeyCount}")
private int batchSyncKeyCount = 1000;
@Value("${nacos.naming.partition.initDataRatio}")
private float initDataRatio = 0.9F;
public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
@ -40,4 +43,8 @@ public class PartitionConfig {
public int getBatchSyncKeyCount() {
return batchSyncKeyCount;
}
public float getInitDataRatio() {
return initDataRatio;
}
}

View File

@ -22,12 +22,11 @@ import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@ -48,7 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @author nkorange
* @since 1.0.0
*/
@Component("partitionConsistencyService")
@Service("partitionConsistencyService")
public class PartitionConsistencyServiceImpl implements EphemeralConsistencyService {
@Autowired
@ -60,6 +59,9 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@ -207,4 +209,9 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
@Override
public boolean isAvailable() {
return dataSyncer.isInitialized();
}
}

View File

@ -21,15 +21,15 @@ import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* Using simplified Raft protocol to maintain the consistency status of Nacos cluster.
* Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
*
* @author nkorange
* @since 1.0.0
*/
@Component
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
@Autowired
@ -80,6 +80,11 @@ public class RaftConsistencyServiceImpl implements PersistentConsistencyService
return null;
}
@Override
public boolean isAvailable() {
return true;
}
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);

View File

@ -286,7 +286,7 @@ public class RaftCore {
// if data should be persistent, usually this is always true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
RaftStore.write(datum);
raftStore.write(datum);
}
datums.put(datum.key, datum);
@ -672,7 +672,7 @@ public class RaftCore {
continue;
}
RaftStore.write(datum);
raftStore.write(datum);
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
Datum<Service> serviceDatum = new Datum<>();
@ -862,7 +862,7 @@ public class RaftCore {
}
// FIXME should we ignore the value of 'deleted'?
if (deleted != null) {
RaftStore.delete(deleted);
raftStore.delete(deleted);
notifier.addTask(deleted, ApplyAction.DELETE);
Loggers.RAFT.info("datum deleted, key: {}", key);
}

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -40,31 +41,21 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacos.common.util.SystemUtils.NACOS_HOME;
import static com.alibaba.nacos.common.util.SystemUtils.NACOS_HOME_KEY;
/**
* @author nacos
*/
@Component
public class RaftStore {
private static String BASE_DIR = NACOS_HOME + File.separator + "raft";
private static String META_FILE_NAME;
private Properties meta = new Properties();
private static String CACHE_DIR;
private String metaFileName;
static {
private String cacheDir;
if (StringUtils.isNotBlank(System.getProperty(NACOS_HOME_KEY))) {
BASE_DIR = NACOS_HOME + File.separator + "data" + File.separator + "naming";
}
META_FILE_NAME = BASE_DIR + File.separator + "meta.properties";
CACHE_DIR = BASE_DIR + File.separator + "data";
public RaftStore() {
metaFileName = UtilsAndCommons.DATA_BASE_DIR + File.separator + "meta.properties";
cacheDir = UtilsAndCommons.DATA_BASE_DIR + File.separator + "data";
}
public synchronized ConcurrentHashMap<String, Datum<?>> loadDatums(RaftCore.Notifier notifier) throws Exception {
@ -94,7 +85,7 @@ public class RaftStore {
}
public synchronized Properties loadMeta() throws Exception {
File metaFile = new File(META_FILE_NAME);
File metaFile = new File(metaFileName);
if (!metaFile.exists() && !metaFile.getParentFile().mkdirs() && !metaFile.createNewFile()) {
throw new IllegalStateException("failed to create meta file: " + metaFile.getAbsolutePath());
}
@ -186,16 +177,16 @@ public class RaftStore {
}
}
public synchronized static void write(final Datum datum) throws Exception {
public synchronized void write(final Datum datum) throws Exception {
String namespaceId = KeyBuilder.getNamespace(datum.key);
File cacheFile;
if (StringUtils.isNotBlank(namespaceId)) {
cacheFile = new File(CACHE_DIR + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
cacheFile = new File(cacheDir + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
} else {
cacheFile = new File(CACHE_DIR + File.separator + encodeFileName(datum.key));
cacheFile = new File(cacheDir + File.separator + encodeFileName(datum.key));
}
if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
@ -231,8 +222,8 @@ public class RaftStore {
}
}
private static File[] listCaches() throws Exception {
File cacheDir = new File(CACHE_DIR);
private File[] listCaches() throws Exception {
File cacheDir = new File(this.cacheDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("cloud not make out directory: " + cacheDir.getName());
}
@ -240,14 +231,14 @@ public class RaftStore {
return cacheDir.listFiles();
}
public static void delete(Datum datum) {
public void delete(Datum datum) {
// datum key contains namespace info:
String namespaceId = KeyBuilder.getNamespace(datum.key);
if (StringUtils.isNotBlank(namespaceId)) {
File cacheFile = new File(CACHE_DIR + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
File cacheFile = new File(cacheDir + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
if (cacheFile.exists() && !cacheFile.delete()) {
Loggers.RAFT.error("[RAFT-DELETE] failed to delete datum: {}, value: {}", datum.key, datum.value);
throw new IllegalStateException("failed to delete datum: " + datum.key);
@ -256,7 +247,7 @@ public class RaftStore {
}
public void updateTerm(long term) throws Exception {
File file = new File(META_FILE_NAME);
File file = new File(metaFileName);
if (!file.exists() && !file.getParentFile().mkdirs() && !file.createNewFile()) {
throw new IllegalStateException("failed to create meta file");
}

View File

@ -64,7 +64,7 @@ public class CatalogController {
String keyword = WebUtils.optional(request, "keyword", StringUtils.EMPTY);
List<Service> doms = new ArrayList<>();
int total = serviceManager.getPagedDom(namespaceId, page - 1, pageSize, keyword, doms);
int total = serviceManager.getPagedService(namespaceId, page - 1, pageSize, keyword, doms);
if (CollectionUtils.isEmpty(doms)) {
result.put("serviceList", Collections.emptyList());
@ -183,7 +183,7 @@ public class CatalogController {
List<ServiceDetailInfo> serviceDetailInfoList = new ArrayList<>();
serviceManager
.getDomMap(namespaceId)
.getServiceMap(namespaceId)
.forEach(
(serviceName, service) -> {
@ -237,7 +237,7 @@ public class CatalogController {
String ip = WebUtils.required(request, "ip");
Set<String> doms = new HashSet<String>();
Map<String, Set<String>> serviceNameMap = serviceManager.getAllDomNames();
Map<String, Set<String>> serviceNameMap = serviceManager.getAllServiceNames();
for (String namespaceId : serviceNameMap.keySet()) {
for (String serviceName : serviceNameMap.get(namespaceId)) {

View File

@ -62,7 +62,7 @@ public class HealthController {
@RequestMapping("/server")
public JSONObject server(HttpServletRequest request) {
JSONObject result = new JSONObject();
result.put("msg", "Hello! I am Nacos-Naming and healthy! total services: raft " + serviceManager.getDomCount()
result.put("msg", "Hello! I am Nacos-Naming and healthy! total services: raft " + serviceManager.getServiceCount()
+ ", local port:" + RunningConfig.getServerPort());
return result;
}
@ -109,7 +109,7 @@ public class HealthController {
Loggers.EVT_LOG.info((valid ? "[IP-ENABLED]" : "[IP-DISABLED]") + " ips: "
+ instance.getIp() + ":" + instance.getPort() + "@" + instance.getClusterName()
+ ", dom: " + dom + ", msg: update thought HealthController api");
pushService.domChanged(namespaceId, service.getName());
pushService.serviceChanged(namespaceId, service.getName());
break;
}
}

View File

@ -123,6 +123,9 @@ public class InstanceController {
String dom = WebUtils.required(request, CommonParams.SERVICE_NAME);
String agent = request.getHeader("Client-Version");
if (StringUtils.isBlank(agent)) {
agent = request.getHeader("User-Agent");
}
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
@ -62,6 +63,9 @@ public class OperatorController {
@Autowired
private ServerListManager serverListManager;
@Autowired
private ServerStatusManager serverStatusManager;
@Autowired
private SwitchDomain switchDomain;
@ -131,16 +135,17 @@ public class OperatorController {
JSONObject result = new JSONObject();
int domCount = serviceManager.getDomCount();
int domCount = serviceManager.getServiceCount();
int ipCount = serviceManager.getInstanceCount();
int responsibleDomCount = serviceManager.getResponsibleDomCount();
int responsibleIPCount = serviceManager.getResponsibleIPCount();
int responsibleDomCount = serviceManager.getResponsibleServiceCount();
int responsibleIPCount = serviceManager.getResponsibleInstanceCount();
result.put("domCount", domCount);
result.put("ipCount", ipCount);
result.put("responsibleDomCount", responsibleDomCount);
result.put("responsibleIPCount", responsibleIPCount);
result.put("status", serverStatusManager.getServerStatus().name());
result.put("serviceCount", domCount);
result.put("instanceCount", ipCount);
result.put("responsibleServiceCount", responsibleDomCount);
result.put("responsibleInstanceCount", responsibleIPCount);
result.put("cpu", SystemUtils.getCPU());
result.put("load", SystemUtils.getLoad());
result.put("mem", SystemUtils.getMem());

View File

@ -25,7 +25,6 @@ import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.persistent.raft.*;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
@ -43,7 +42,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -186,7 +184,7 @@ public class RaftController {
response.setHeader("Content-Encode", "gzip");
JSONObject result = new JSONObject();
result.put("doms", domainsManager.getDomCount());
result.put("doms", domainsManager.getServiceCount());
result.put("peers", raftCore.getPeers());
return result;

View File

@ -115,7 +115,7 @@ public class ServiceController {
throw new IllegalArgumentException("specified service has instances, serviceName : " + serviceName);
}
serviceManager.easyRemoveDom(namespaceId, serviceName);
serviceManager.easyRemoveService(namespaceId, serviceName);
return "ok";
}
@ -257,7 +257,7 @@ public class ServiceController {
Map<String, Set<String>> doms = new HashMap<>(16);
Map<String, Set<String>> domMap = serviceManager.getAllDomNames();
Map<String, Set<String>> domMap = serviceManager.getAllServiceNames();
for (String namespaceId : domMap.keySet()) {
doms.put(namespaceId, new HashSet<>());
@ -285,7 +285,7 @@ public class ServiceController {
String expr = WebUtils.required(request, "expr");
List<Service> doms
= serviceManager.searchDomains(namespaceId, ".*" + expr + ".*");
= serviceManager.searchServices(namespaceId, ".*" + expr + ".*");
if (CollectionUtils.isEmpty(doms)) {
result.put("doms", Collections.emptyList());
@ -313,13 +313,13 @@ public class ServiceController {
}
try {
ServiceManager.DomainChecksum checksums = JSON.parseObject(domsStatusString, ServiceManager.DomainChecksum.class);
ServiceManager.ServiceChecksum checksums = JSON.parseObject(domsStatusString, ServiceManager.ServiceChecksum.class);
if (checksums == null) {
Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: null");
return "fail";
}
for (Map.Entry<String, String> entry : checksums.domName2Checksum.entrySet()) {
for (Map.Entry<String, String> entry : checksums.serviceName2Checksum.entrySet()) {
if (entry == null || StringUtils.isEmpty(entry.getKey()) || StringUtils.isEmpty(entry.getValue())) {
continue;
}
@ -338,7 +338,7 @@ public class ServiceController {
Loggers.SRV_LOG.debug("checksum of {} is not consistent, remote: {}, checksum: {}, local: {}",
dom, serverIP, checksum, domain.getChecksum());
}
serviceManager.addUpdatedDom2Queue(checksums.namespaceId, dom, serverIP, checksum);
serviceManager.addUpdatedService2Queue(checksums.namespaceId, dom, serverIP, checksum);
}
}
} catch (Exception e) {

View File

@ -213,7 +213,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().domChanged(namespaceId, getName());
getPushService().serviceChanged(namespaceId, getName());
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {

View File

@ -54,7 +54,7 @@ public class ServiceManager implements DataListener<Service> {
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<DomainKey> toBeUpdatedDomsQueue = new LinkedBlockingDeque<>(1024 * 1024);
private LinkedBlockingDeque<ServiceKey> toBeUpdatedDomsQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new DomainStatusSynchronizer();
@ -65,9 +65,9 @@ public class ServiceManager implements DataListener<Service> {
private final Lock lock = new ReentrantLock();
private Map<String, Condition> dom2ConditionMap = new ConcurrentHashMap<>();
private Map<String, Condition> service2ConditionMap = new ConcurrentHashMap<>();
private Map<String, Lock> dom2LockMap = new ConcurrentHashMap<>();
private Map<String, Lock> service2LockMap = new ConcurrentHashMap<>();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@ -88,14 +88,14 @@ public class ServiceManager implements DataListener<Service> {
private Serializer serializer;
/**
* thread pool that processes getting domain detail from other server asynchronously
* thread pool that processes getting service detail from other server asynchronously
*/
private ExecutorService domainUpdateExecutor
private ExecutorService serviceUpdateExecutor
= Executors.newFixedThreadPool(DOMAIN_UPDATE_EXECUTOR_NUM, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.naming.domain.update.http.handler");
t.setName("com.alibaba.nacos.naming.service.update.http.handler");
t.setDaemon(true);
return t;
}
@ -104,7 +104,7 @@ public class ServiceManager implements DataListener<Service> {
@PostConstruct
public void init() {
UtilsAndCommons.DOMAIN_SYNCHRONIZATION_EXECUTOR.schedule(new DomainReporter(), 60000, TimeUnit.MILLISECONDS);
UtilsAndCommons.DOMAIN_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
UtilsAndCommons.DOMAIN_UPDATE_EXECUTOR.submit(new UpdatedDomainProcessor());
@ -116,18 +116,18 @@ public class ServiceManager implements DataListener<Service> {
}
}
public Map<String, Service> chooseDomMap(String namespaceId) {
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) {
public void addUpdatedService2Queue(String namespaceId, String serviceName, String serverIP, String checksum) {
lock.lock();
try {
toBeUpdatedDomsQueue.offer(new DomainKey(namespaceId, domName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
toBeUpdatedDomsQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
} catch (Exception e) {
toBeUpdatedDomsQueue.poll();
toBeUpdatedDomsQueue.add(new DomainKey(namespaceId, domName, serverIP, checksum));
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add domain to be updatd to queue.", e);
toBeUpdatedDomsQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updatd to queue.", e);
} finally {
lock.unlock();
}
@ -164,16 +164,16 @@ public class ServiceManager implements DataListener<Service> {
} else {
addLockIfAbsent(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()));
putDomain(service);
putService(service);
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-DOM-RAFT] {}", service.toJSON());
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}
wakeUp(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()));
} catch (Throwable e) {
Loggers.SRV_LOG.error("[NACOS-DOM] error while processing dom update", e);
Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);
}
}
@ -181,78 +181,78 @@ public class ServiceManager implements DataListener<Service> {
public void onDelete(String key) throws Exception {
String namespace = KeyBuilder.getNamespace(key);
String name = KeyBuilder.getServiceName(key);
Service dom = chooseDomMap(namespace).remove(name);
Service service = chooseServiceMap(namespace).remove(name);
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is deleted, key: {}", key);
if (dom != null) {
dom.destroy();
if (service != null) {
service.destroy();
consistencyService.remove(KeyBuilder.buildInstanceListKey(namespace, name, true));
consistencyService.remove(KeyBuilder.buildInstanceListKey(namespace, name, false));
consistencyService.unlisten(KeyBuilder.buildServiceMetaKey(namespace, name), dom);
Loggers.SRV_LOG.info("[DEAD-DOM] {}", dom.toJSON());
consistencyService.unlisten(KeyBuilder.buildServiceMetaKey(namespace, name), service);
Loggers.SRV_LOG.info("[DEAD-DOM] {}", service.toJSON());
}
}
private class UpdatedDomainProcessor implements Runnable {
//get changed domain from other server asynchronously
//get changed service from other server asynchronously
@Override
public void run() {
String domName = null;
String serviceName = null;
String serverIP = null;
try {
while (true) {
DomainKey domainKey = null;
ServiceKey serviceKey = null;
try {
domainKey = toBeUpdatedDomsQueue.take();
serviceKey = toBeUpdatedDomsQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (domainKey == null) {
if (serviceKey == null) {
continue;
}
domName = domainKey.getDomName();
serverIP = domainKey.getServerIP();
serviceName = serviceKey.getServiceName();
serverIP = serviceKey.getServerIP();
domainUpdateExecutor.execute(new DomUpdater(domainKey.getNamespaceId(), domName, serverIP));
serviceUpdateExecutor.execute(new ServiceUpdater(serviceKey.getNamespaceId(), serviceName, serverIP));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update dom: {} from {}, error: {}", domName, serverIP, e);
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e);
}
}
}
private class DomUpdater implements Runnable {
private class ServiceUpdater implements Runnable {
String namespaceId;
String domName;
String serviceName;
String serverIP;
public DomUpdater(String namespaceId, String domName, String serverIP) {
public ServiceUpdater(String namespaceId, String serviceName, String serverIP) {
this.namespaceId = namespaceId;
this.domName = domName;
this.serviceName = serviceName;
this.serverIP = serverIP;
}
@Override
public void run() {
try {
updatedDom2(namespaceId, domName, serverIP);
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update dom: {} from {}, error: {}",
domName, serverIP, e);
Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",
serviceName, serverIP, e);
}
}
}
public void updatedDom2(String namespaceId, String domName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, domName));
JSONObject dom = JSON.parseObject(msg.getData());
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
JSONObject serviceJson = JSON.parseObject(msg.getData());
JSONArray ipList = dom.getJSONArray("ips");
JSONArray ipList = serviceJson.getJSONArray("ips");
Map<String, String> ipsMap = new HashMap<>(ipList.size());
for (int i = 0; i < ipList.size(); i++) {
@ -261,7 +261,7 @@ public class ServiceManager implements DataListener<Service> {
ipsMap.put(strings[0], strings[1]);
}
Service service = getService(namespaceId, domName);
Service service = getService(namespaceId, serviceName);
if (service == null) {
return;
@ -274,27 +274,27 @@ public class ServiceManager implements DataListener<Service> {
if (valid != instance.isValid()) {
instance.setValid(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}",
domName, (instance.isValid() ? "ENABLED" : "DISABLED"),
serviceName, (instance.isValid() ? "ENABLED" : "DISABLED"),
instance.getIp(), instance.getPort(), instance.getClusterName());
}
}
pushService.domChanged(service.getNamespaceId(), service.getName());
pushService.serviceChanged(service.getNamespaceId(), service.getName());
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isValid()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] dom: {}, ips: {}", service.getName(), stringBuilder.toString());
Loggers.EVT_LOG.info("[IP-UPDATED] service: {}, ips: {}", service.getName(), stringBuilder.toString());
}
public Set<String> getAllDomNames(String namespaceId) {
public Set<String> getAllServiceNames(String namespaceId) {
return serviceMap.get(namespaceId).keySet();
}
public Map<String, Set<String>> getAllDomNames() {
public Map<String, Set<String>> getAllServiceNames() {
Map<String, Set<String>> namesMap = new HashMap<>(16);
for (String namespaceId : serviceMap.keySet()) {
@ -304,51 +304,51 @@ public class ServiceManager implements DataListener<Service> {
}
public List<String> getAllDomNamesList(String namespaceId) {
if (chooseDomMap(namespaceId) == null) {
if (chooseServiceMap(namespaceId) == null) {
return new ArrayList<>();
}
return new ArrayList<>(chooseDomMap(namespaceId).keySet());
return new ArrayList<>(chooseServiceMap(namespaceId).keySet());
}
public Map<String, Set<Service>> getResponsibleDoms() {
public Map<String, Set<Service>> getResponsibleServices() {
Map<String, Set<Service>> result = new HashMap<>(16);
for (String namespaceId : serviceMap.keySet()) {
result.put(namespaceId, new HashSet<>());
for (Map.Entry<String, Service> entry : serviceMap.get(namespaceId).entrySet()) {
Service domain = entry.getValue();
Service service = entry.getValue();
if (distroMapper.responsible(entry.getKey())) {
result.get(namespaceId).add(domain);
result.get(namespaceId).add(service);
}
}
}
return result;
}
public int getResponsibleDomCount() {
int domCount = 0;
public int getResponsibleServiceCount() {
int serviceCount = 0;
for (String namespaceId : serviceMap.keySet()) {
for (Map.Entry<String, Service> entry : serviceMap.get(namespaceId).entrySet()) {
if (distroMapper.responsible(entry.getKey())) {
domCount++;
serviceCount++;
}
}
}
return domCount;
return serviceCount;
}
public int getResponsibleIPCount() {
Map<String, Set<Service>> responsibleDoms = getResponsibleDoms();
public int getResponsibleInstanceCount() {
Map<String, Set<Service>> responsibleServices = getResponsibleServices();
int count = 0;
for (String namespaceId : responsibleDoms.keySet()) {
for (Service domain : responsibleDoms.get(namespaceId)) {
count += domain.allIPs().size();
for (String namespaceId : responsibleServices.keySet()) {
for (Service service : responsibleServices.get(namespaceId)) {
count += service.allIPs().size();
}
}
return count;
}
public void easyRemoveDom(String namespaceId, String serviceName) throws Exception {
public void easyRemoveService(String namespaceId, String serviceName) throws Exception {
consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName));
}
@ -374,7 +374,7 @@ public class ServiceManager implements DataListener<Service> {
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
// now validate the dom. if failed, exception will be thrown
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
service.valid();
@ -449,9 +449,9 @@ public class ServiceManager implements DataListener<Service> {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service dom = getService(namespaceId, serviceName);
Service service = getService(namespaceId, serviceName);
Map<String, Instance> instanceMap = substractIpAddresses(dom, ephemeral, ips);
Map<String, Instance> instanceMap = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceMap(instanceMap);
@ -482,9 +482,9 @@ public class ServiceManager implements DataListener<Service> {
return null;
}
public Map<String, Instance> updateIpAddresses(Service dom, String action, boolean ephemeral, Instance... ips) throws NacosException {
public Map<String, Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(dom.getNamespaceId(), dom.getName(), ephemeral));
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
Map<String, Instance> oldInstances = new HashMap<>(16);
@ -493,7 +493,7 @@ public class ServiceManager implements DataListener<Service> {
}
Map<String, Instance> instances;
List<Instance> currentIPs = dom.allIPs(ephemeral);
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());
for (Instance instance : currentIPs) {
@ -507,10 +507,10 @@ public class ServiceManager implements DataListener<Service> {
instanceMap.putAll(instances);
for (Instance instance : ips) {
if (!dom.getClusterMap().containsKey(instance.getClusterName())) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName());
cluster.setDom(dom);
dom.getClusterMap().put(instance.getClusterName(), cluster);
cluster.setDom(service);
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}
@ -524,19 +524,19 @@ public class ServiceManager implements DataListener<Service> {
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, dom: " + dom.getName() + ", ip list: "
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
+ JSON.toJSONString(instanceMap.values()));
}
return instanceMap;
}
public Map<String, Instance> substractIpAddresses(Service dom, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(dom, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
public Map<String, Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
public Map<String, Instance> addIpAddresses(Service dom, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(dom, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
public Map<String, Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
private Map<String, Instance> setValid(Map<String, Instance> oldInstances, Map<String, Instance> map) {
@ -550,72 +550,72 @@ public class ServiceManager implements DataListener<Service> {
return oldInstances;
}
public Service getService(String namespaceId, String domName) {
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseDomMap(namespaceId).get(domName);
return chooseServiceMap(namespaceId).get(serviceName);
}
public void putDomain(Service domain) {
if (!serviceMap.containsKey(domain.getNamespaceId())) {
serviceMap.put(domain.getNamespaceId(), new ConcurrentHashMap<>(16));
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
}
serviceMap.get(domain.getNamespaceId()).put(domain.getName(), domain);
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
public List<Service> searchDomains(String namespaceId, String regex) {
public List<Service> searchServices(String namespaceId, String regex) {
List<Service> result = new ArrayList<>();
for (Map.Entry<String, Service> entry : chooseDomMap(namespaceId).entrySet()) {
Service dom = entry.getValue();
String key = dom.getName() + ":" + ArrayUtils.toString(dom.getOwners());
for (Map.Entry<String, Service> entry : chooseServiceMap(namespaceId).entrySet()) {
Service service = entry.getValue();
String key = service.getName() + ":" + ArrayUtils.toString(service.getOwners());
if (key.matches(regex)) {
result.add(dom);
result.add(service);
}
}
return result;
}
public int getDomCount() {
int domCount = 0;
public int getServiceCount() {
int serviceCount = 0;
for (String namespaceId : serviceMap.keySet()) {
domCount += serviceMap.get(namespaceId).size();
serviceCount += serviceMap.get(namespaceId).size();
}
return domCount;
return serviceCount;
}
public int getInstanceCount() {
int total = 0;
for (String namespaceId : serviceMap.keySet()) {
for (Service domain : serviceMap.get(namespaceId).values()) {
total += domain.allIPs().size();
for (Service service : serviceMap.get(namespaceId).values()) {
total += service.allIPs().size();
}
}
return total;
}
public Map<String, Service> getDomMap(String namespaceId) {
public Map<String, Service> getServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
public int getPagedDom(String namespaceId, int startPage, int pageSize, String keyword, List<Service> domainList) {
public int getPagedService(String namespaceId, int startPage, int pageSize, String keyword, List<Service> serviceList) {
List<Service> matchList;
if (chooseDomMap(namespaceId) == null) {
if (chooseServiceMap(namespaceId) == null) {
return 0;
}
if (StringUtils.isNotBlank(keyword)) {
matchList = searchDomains(namespaceId, ".*" + keyword + ".*");
matchList = searchServices(namespaceId, ".*" + keyword + ".*");
} else {
matchList = new ArrayList<Service>(chooseDomMap(namespaceId).values());
matchList = new ArrayList<Service>(chooseServiceMap(namespaceId).values());
}
if (pageSize >= matchList.size()) {
domainList.addAll(matchList);
serviceList.addAll(matchList);
return matchList.size();
}
@ -624,9 +624,9 @@ public class ServiceManager implements DataListener<Service> {
continue;
}
domainList.add(matchList.get(i));
serviceList.add(matchList.get(i));
if (domainList.size() >= pageSize) {
if (serviceList.size() >= pageSize) {
break;
}
}
@ -634,61 +634,61 @@ public class ServiceManager implements DataListener<Service> {
return matchList.size();
}
public static class DomainChecksum {
public static class ServiceChecksum {
public String namespaceId;
public Map<String, String> domName2Checksum = new HashMap<String, String>();
public Map<String, String> serviceName2Checksum = new HashMap<String, String>();
public DomainChecksum() {
public ServiceChecksum() {
this.namespaceId = UtilsAndCommons.DEFAULT_NAMESPACE_ID;
}
public DomainChecksum(String namespaceId) {
public ServiceChecksum(String namespaceId) {
this.namespaceId = namespaceId;
}
public void addItem(String domName, String checksum) {
if (StringUtils.isEmpty(domName) || StringUtils.isEmpty(checksum)) {
Loggers.SRV_LOG.warn("[DOMAIN-CHECKSUM] domName or checksum is empty,domName: {}, checksum: {}",
domName, checksum);
public void addItem(String serviceName, String checksum) {
if (StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(checksum)) {
Loggers.SRV_LOG.warn("[DOMAIN-CHECKSUM] serviceName or checksum is empty,serviceName: {}, checksum: {}",
serviceName, checksum);
return;
}
domName2Checksum.put(domName, checksum);
serviceName2Checksum.put(serviceName, checksum);
}
}
private class DomainReporter implements Runnable {
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
Map<String, Set<String>> allDomainNames = getAllDomNames();
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allDomainNames.size() <= 0) {
if (allServiceNames.size() <= 0) {
//ignore
return;
}
for (String namespaceId : allDomainNames.keySet()) {
for (String namespaceId : allServiceNames.keySet()) {
DomainChecksum checksum = new DomainChecksum(namespaceId);
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String domName : allDomainNames.get(namespaceId)) {
if (!distroMapper.responsible(domName)) {
for (String serviceName : allServiceNames.get(namespaceId)) {
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service domain = getService(namespaceId, domName);
Service service = getService(namespaceId, serviceName);
if (domain == null) {
if (service == null) {
continue;
}
domain.recalculateChecksum();
service.recalculateChecksum();
checksum.addItem(domName, domain.getChecksum());
checksum.addItem(serviceName, service.getChecksum());
}
Message msg = new Message();
@ -709,7 +709,7 @@ public class ServiceManager implements DataListener<Service> {
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending domain status", e);
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
UtilsAndCommons.DOMAIN_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getDomStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);
}
@ -718,8 +718,8 @@ public class ServiceManager implements DataListener<Service> {
public void wakeUp(String key) {
Lock lock = dom2LockMap.get(key);
Condition condition = dom2ConditionMap.get(key);
Lock lock = service2LockMap.get(key);
Condition condition = service2ConditionMap.get(key);
try {
lock.lock();
@ -732,23 +732,23 @@ public class ServiceManager implements DataListener<Service> {
public Lock addLockIfAbsent(String key) {
if (dom2LockMap.containsKey(key)) {
return dom2LockMap.get(key);
if (service2LockMap.containsKey(key)) {
return service2LockMap.get(key);
}
Lock lock = new ReentrantLock();
dom2LockMap.put(key, lock);
service2LockMap.put(key, lock);
return lock;
}
public Condition addCondtion(String key) {
Condition condition = dom2LockMap.get(key).newCondition();
dom2ConditionMap.put(key, condition);
Condition condition = service2LockMap.get(key).newCondition();
service2ConditionMap.put(key, condition);
return condition;
}
private static class DomainKey {
private static class ServiceKey {
private String namespaceId;
private String domName;
private String serviceName;
private String serverIP;
public String getChecksum() {
@ -759,8 +759,8 @@ public class ServiceManager implements DataListener<Service> {
return serverIP;
}
public String getDomName() {
return domName;
public String getServiceName() {
return serviceName;
}
public String getNamespaceId() {
@ -769,9 +769,9 @@ public class ServiceManager implements DataListener<Service> {
private String checksum;
public DomainKey(String namespaceId, String domName, String serverIP, String checksum) {
public ServiceKey(String namespaceId, String serviceName, String serverIP, String checksum) {
this.namespaceId = namespaceId;
this.domName = domName;
this.serviceName = serviceName;
this.serverIP = serverIP;
this.checksum = checksum;
}

View File

@ -74,7 +74,7 @@ public class ClientBeatCheckTask implements Runnable {
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
UtilsAndCommons.LOCALHOST_SITE, ClientBeatProcessor.CLIENT_BEAT_TIMEOUT, instance.getLastBeat());
getPushService().domChanged(domain.getNamespaceId(), domain.getName());
getPushService().serviceChanged(domain.getNamespaceId(), domain.getName());
}
}
}

View File

@ -78,7 +78,7 @@ public class ClientBeatProcessor implements Runnable {
instance.setValid(true);
Loggers.EVT_LOG.info("dom: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getDom().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
getPushService().domChanged(service.getNamespaceId(), this.service.getName());
getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
}
}
}

View File

@ -145,7 +145,7 @@ public class HealthCheckCommon {
Service vDom = cluster.getDom();
vDom.setLastModifiedMillis(System.currentTimeMillis());
pushService.domChanged(vDom.getNamespaceId(), vDom.getName());
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
addResult(new HealthCheckResult(vDom.getName(), ip));
Loggers.EVT_LOG.info("dom: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
@ -184,7 +184,7 @@ public class HealthCheckCommon {
vDom.setLastModifiedMillis(System.currentTimeMillis());
addResult(new HealthCheckResult(vDom.getName(), ip));
pushService.domChanged(vDom.getNamespaceId(), vDom.getName());
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
Loggers.EVT_LOG.info("dom: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
cluster.getDom().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);
@ -218,7 +218,7 @@ public class HealthCheckCommon {
Service vDom = cluster.getDom();
vDom.setLastModifiedMillis(System.currentTimeMillis());
pushService.domChanged(vDom.getNamespaceId(), vDom.getName());
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
addResult(new HealthCheckResult(vDom.getName(), ip));
Loggers.EVT_LOG.info("dom: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",

View File

@ -37,6 +37,8 @@ public class GlobalExecutor {
private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
private static ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
@ -122,12 +124,16 @@ public class GlobalExecutor {
SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void registerServerStatusUpdater(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
}
public static void registerHeartbeat(Runnable runnable) {
executorService.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
public static void schedule(Runnable runnable, long delay) {
executorService.scheduleAtFixedRate(runnable, 0, delay, TimeUnit.MILLISECONDS);
public static void schedule(Runnable runnable, long period) {
executorService.scheduleAtFixedRate(runnable, 0, period, TimeUnit.MILLISECONDS);
}
public static void notifyServerListChange(Runnable runnable) {

View File

@ -106,6 +106,7 @@ public class HttpClient {
conn.setRequestMethod(method);
conn.addRequestProperty("Client-Version", UtilsAndCommons.SERVER_VERSION);
conn.addRequestProperty("User-Agent", UtilsAndCommons.SERVER_VERSION);
setHeaders(conn, headers, encoding);
conn.connect();
@ -419,6 +420,7 @@ public class HttpClient {
+ encoding);
conn.addRequestProperty("Accept-Charset", encoding);
conn.addRequestProperty("Client-Version", UtilsAndCommons.SERVER_VERSION);
conn.addRequestProperty("User-Agent", UtilsAndCommons.SERVER_VERSION);
}
public static String encodingParams(Map<String, String> params, String encoding)

View File

@ -42,6 +42,7 @@ public class NamingProxy {
Map<String, String> headers = new HashMap<>(128);
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
headers.put("Connection", "Keep-Alive");
HttpClient.asyncHttpPutLarge("http://" + server + RunningConfig.getContextPath()
@ -93,6 +94,7 @@ public class NamingProxy {
Map<String, String> headers = new HashMap<>(128);
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
headers.put("Accept-Encoding", "gzip,deflate,sdch");
headers.put("Connection", "Keep-Alive");
headers.put("Content-Encoding", "gzip");
@ -121,6 +123,7 @@ public class NamingProxy {
public static String reqAPI(String api, Map<String, String> params, String curServer) throws Exception {
try {
List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION,
"User-Agent", UtilsAndCommons.SERVER_VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"Content-Encoding", "gzip");
@ -155,6 +158,7 @@ public class NamingProxy {
public static String reqAPI(String api, Map<String, String> params, String curServer, boolean isPost) throws Exception {
try {
List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION,
"User-Agent", UtilsAndCommons.SERVER_VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"Content-Encoding", "gzip");

View File

@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.Map;
/**
* Report local server status to other server
*
* @author nacos
*/
public class ServerStatusSynchronizer implements Synchronizer {

View File

@ -93,6 +93,8 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
public boolean enableAuthentication = false;
public String overriddenServerStatus = null;
public boolean isEnableAuthentication() {
return enableAuthentication;
}
@ -153,26 +155,6 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
}
@Override
public boolean interests(String key) {
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
}
@Override
public boolean matchUnlistenKey(String key) {
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
}
@Override
public void onChange(String key, SwitchDomain domain) throws Exception {
update(domain);
}
@Override
public void onDelete(String key) throws Exception {
}
public List<String> getIncrementalList() {
return incrementalList;
}
@ -371,10 +353,38 @@ public class SwitchDomain implements DataListener<SwitchDomain> {
this.pushCVersion = pushCVersion;
}
public String getOverriddenServerStatus() {
return overriddenServerStatus;
}
public void setOverriddenServerStatus(String overriddenServerStatus) {
this.overriddenServerStatus = overriddenServerStatus;
}
public void replace(SwitchDomain newSwitchDomain) {
// TODO
}
@Override
public boolean interests(String key) {
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
}
@Override
public boolean matchUnlistenKey(String key) {
return key.contains(UtilsAndCommons.SWITCH_DOMAIN_NAME);
}
@Override
public void onChange(String key, SwitchDomain domain) throws Exception {
update(domain);
}
@Override
public void onDelete(String key) throws Exception {
}
public interface HealthParams {
/**
* Maximum RT

View File

@ -63,4 +63,7 @@ public class SwitchEntry {
public static final String ACTION_OVERVIEW = "overview";
public static final String PARAM_JSON = "json";
public static final String READ_ENABLED = "readEnabled";
public static final String OVERRIDDEN_SERVER_STATUS = "overriddenServerStatus";
}

View File

@ -63,8 +63,8 @@ public class SwitchManager {
try {
lock.lock();
Datum datum = (Datum) consistencyService.get(UtilsAndCommons.getSwitchDomainKey());
SwitchDomain switchDomain = null;
Datum datum = consistencyService.get(UtilsAndCommons.getSwitchDomainKey());
SwitchDomain switchDomain;
if (datum != null) {
switchDomain = JSON.parseObject((String) datum.value, SwitchDomain.class);
@ -139,7 +139,6 @@ public class SwitchManager {
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -167,7 +166,6 @@ public class SwitchManager {
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -182,7 +180,6 @@ public class SwitchManager {
switchDomain.setPushCacheMillis(cacheMillis);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -198,7 +195,6 @@ public class SwitchManager {
switchDomain.setDefaultCacheMillis(cacheMillis);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -219,7 +215,6 @@ public class SwitchManager {
switchDomain.setDistroEnabled(enabled);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -230,7 +225,6 @@ public class SwitchManager {
switchDomain.setHealthCheckEnabled(enabled);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -245,7 +239,6 @@ public class SwitchManager {
switchDomain.setDomStatusSynchronizationPeriodMillis(millis);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -260,7 +253,6 @@ public class SwitchManager {
switchDomain.setServerStatusSynchronizationPeriodMillis(millis);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -271,7 +263,6 @@ public class SwitchManager {
switchDomain.setCheckTimes(times);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -282,7 +273,6 @@ public class SwitchManager {
switchDomain.setDisableAddIP(disableAddIP);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -293,7 +283,6 @@ public class SwitchManager {
switchDomain.setEnableCache(enableCache);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -304,7 +293,6 @@ public class SwitchManager {
switchDomain.setSendBeatOnly(sendBeatOnly);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
@ -338,22 +326,31 @@ public class SwitchManager {
switchDomain.setLimitedUrlMap(limitedUrlMap);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
}
if (entry.equals(SwitchEntry.ENABLE_STANDALONE)) {
String enable = value;
String enabled = value;
if (!StringUtils.isNotEmpty(enable)) {
switchDomain.setEnableStandalone(Boolean.parseBoolean(enable));
if (!StringUtils.isNotEmpty(enabled)) {
switchDomain.setEnableStandalone(Boolean.parseBoolean(enabled));
}
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
;
}
return;
}
if (entry.equals(SwitchEntry.OVERRIDDEN_SERVER_STATUS)) {
String status = value;
switchDomain.setOverriddenServerStatus(status);
if (!debug) {
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), JSON.toJSONString(switchDomain));
}
return;

View File

@ -28,10 +28,14 @@ import com.alibaba.nacos.naming.selector.Selector;
import com.alibaba.nacos.naming.selector.SelectorJsonAdapter;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import static com.alibaba.nacos.common.util.SystemUtils.NACOS_HOME;
import static com.alibaba.nacos.common.util.SystemUtils.NACOS_HOME_KEY;
/**
* @author nacos
*/
@ -77,11 +81,9 @@ public class UtilsAndCommons {
public static final String SWITCH_DOMAIN_NAME = "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
static public final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+";
public static final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+";
static public final String UNKNOWN_SITE = "unknown";
static public final String UNKNOWN_HOST = "unknown";
public static final String UNKNOWN_SITE = "unknown";
public static final String DEFAULT_CLUSTER_NAME = "DEFAULT";
@ -89,11 +91,6 @@ public class UtilsAndCommons {
public static final int RAFT_PUBLISH_TIMEOUT = 5000;
static public final String RAFT_DOM_PRE = "meta.";
static public final String RAFT_IPLIST_PRE = "iplist.";
static public final String RAFT_TAG_DOM_PRE = "tag.meta";
static public final String RAFT_TAG_IPLIST_PRE = "tag.iplist.";
public static final String SERVER_VERSION = NACOS_SERVER_HEADER + ":" + NACOS_VERSION;
public static final String SELF_SERVICE_CLUSTER_ENV = "naming_self_service_cluster_ips";
@ -120,11 +117,9 @@ public class UtilsAndCommons {
public static final String UPDATE_INSTANCE_ACTION_REMOVE = "remove";
public static final String INSTANCE_LIST_PERSISTED_PROPERTY_KEY = "nacos.instanceListPersisted";
public static final String DEFAULT_NAMESPACE_ID = "public";
public static final boolean INSTANCE_LIST_PERSISTED = Boolean.getBoolean(INSTANCE_LIST_PERSISTED_PROPERTY_KEY);
public static final String DATA_BASE_DIR = NACOS_HOME + File.separator + "data" + File.separator + "naming";
public static final ScheduledExecutorService DOMAIN_SYNCHRONIZATION_EXECUTOR;
@ -135,6 +130,7 @@ public class UtilsAndCommons {
public static final Executor RAFT_PUBLISH_EXECUTOR;
static {
// custom serializer and deserializer for fast-json
SerializeConfig.getGlobalInstance()
.put(AbstractHealthChecker.class, JsonAdapter.getInstance());

View File

@ -103,7 +103,7 @@ public class PerformanceLoggerThread {
@Scheduled(cron = "0/15 * * * * ?")
public void collectmetrics() {
int domCount = serviceManager.getDomCount();
int domCount = serviceManager.getServiceCount();
MetricsMonitor.getDomCountMonitor().set(domCount);
int ipCount = serviceManager.getInstanceCount();
@ -132,7 +132,7 @@ public class PerformanceLoggerThread {
@Override
public void run() {
try {
int domCount = serviceManager.getDomCount();
int domCount = serviceManager.getServiceCount();
int ipCount = serviceManager.getInstanceCount();
long maxPushMaxCost = getMaxPushCost();
long maxPushCost = getMaxPushCost();

View File

@ -89,7 +89,6 @@ public class PushService {
}
});
static {
try {
udpSocket = new DatagramSocket();
@ -218,10 +217,10 @@ public class PushService {
return dom + UtilsAndCommons.CACHE_KEY_SPLITER + agent;
}
public void domChanged(final String namespaceId, final String dom) {
public void serviceChanged(final String namespaceId, final String serviceName) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, dom))) {
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
return;
}
@ -229,8 +228,8 @@ public class PushService {
@Override
public void run() {
try {
Loggers.PUSH.info(dom + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
@ -246,8 +245,8 @@ public class PushService {
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", dom, client.toString());
String key = getPushCacheKey(dom, client.getIp(), client.getAgent());
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
@ -255,7 +254,7 @@ public class PushService {
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", dom, client.getAddrStr());
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
@ -273,16 +272,16 @@ public class PushService {
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", dom, e);
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, dom));
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, dom), future);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
public boolean canEnablePush(String agent) {

View File

@ -24,13 +24,12 @@ import javax.servlet.Filter;
/**
* @author nkorange
*/
@Configuration
public class NamingConfig {
@Bean
public FilterRegistrationBean distroFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
FilterRegistrationBean<DistroFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(distroFilter());
registration.addUrlPatterns("/*");
registration.setName("distroFilter");
@ -39,9 +38,20 @@ public class NamingConfig {
return registration;
}
@Bean
public FilterRegistrationBean trafficReviseFilterRegistration() {
FilterRegistrationBean<TrafficReviseFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(trafficReviseFilter());
registration.addUrlPatterns("/*");
registration.setName("trafficReviseFilter");
registration.setOrder(1);
return registration;
}
@Bean
public FilterRegistrationBean authFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
FilterRegistrationBean<AuthFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(authFilter());
registration.addUrlPatterns("/api/*", "/raft/*");
@ -52,12 +62,17 @@ public class NamingConfig {
}
@Bean
public Filter distroFilter() {
public DistroFilter distroFilter() {
return new DistroFilter();
}
@Bean
public Filter authFilter() {
public TrafficReviseFilter trafficReviseFilter() {
return new TrafficReviseFilter();
}
@Bean
public AuthFilter authFilter() {
return new AuthFilter();
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.common.util.HttpMethod;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* Filter incoming traffic to refuse or revise unexpected requests
*
* @author nkorange
* @since 1.0.0
*/
public class TrafficReviseFilter implements Filter {
@Autowired
private ServerStatusManager serverStatusManager;
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;
// if server is UP:
if (serverStatusManager.getServerStatus() == ServerStatus.UP) {
filterChain.doFilter(req, resp);
return;
}
// requests from peer server should be let pass:
String agent = req.getHeader("Client-Version");
if (StringUtils.isBlank(agent)) {
agent = req.getHeader("User-Agent");
}
if (StringUtils.startsWith(agent, UtilsAndCommons.NACOS_SERVER_HEADER)) {
filterChain.doFilter(req, resp);
return;
}
// write operation should be let pass in WRITE_ONLY status:
if (serverStatusManager.getServerStatus() == ServerStatus.WRITE_ONLY && !HttpMethod.GET.equals(req.getMethod())) {
filterChain.doFilter(req, resp);
return;
}
// read operation should be let pass in READY_ONLY status:
if (serverStatusManager.getServerStatus() == ServerStatus.READY_ONLY && HttpMethod.GET.equals(req.getMethod())) {
filterChain.doFilter(req, resp);
return;
}
resp.getWriter().write("service is " + serverStatusManager.getServerStatus().name() + " now, please try again later!");
resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
}

View File

@ -46,7 +46,7 @@ public class DomainsManagerTest extends BaseTest {
@Test
public void easyRemoveDom() throws Exception {
domainsManager.easyRemoveDom(UtilsAndCommons.DEFAULT_NAMESPACE_ID, "nacos.test.1");
domainsManager.easyRemoveService(UtilsAndCommons.DEFAULT_NAMESPACE_ID, "nacos.test.1");
}
@Test
@ -54,9 +54,9 @@ public class DomainsManagerTest extends BaseTest {
Service domain = new Service();
domain.setName("nacos.test.1");
domainsManager.chooseDomMap(UtilsAndCommons.DEFAULT_NAMESPACE_ID).put("nacos.test.1", domain);
domainsManager.chooseServiceMap(UtilsAndCommons.DEFAULT_NAMESPACE_ID).put("nacos.test.1", domain);
List<Service> list = domainsManager.searchDomains(UtilsAndCommons.DEFAULT_NAMESPACE_ID, "nacos.test.*");
List<Service> list = domainsManager.searchServices(UtilsAndCommons.DEFAULT_NAMESPACE_ID, "nacos.test.*");
Assert.assertNotNull(list);
Assert.assertEquals(1, list.size());
Assert.assertEquals("nacos.test.1", list.get(0).getName());

View File

@ -30,6 +30,9 @@ public class RaftStoreTest {
@Mock
public RaftCore raftCore;
@Mock
public RaftStore raftStore;
@Test
public void wrietDatum() throws Exception {
@ -37,7 +40,7 @@ public class RaftStoreTest {
datum.key = "1.2.3.4";
datum.value = "value1";
RaftStore.write(datum);
raftStore.write(datum);
raftCore.loadDatum("1.2.3.4");