Merge branch 'feature_multi_tenant' into feature_naming_ap

# Conflicts:
#	naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
#	naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HttpHealthCheckProcessor.java
#	naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java
#	naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
#	naming/src/main/java/com/alibaba/nacos/naming/monitor/PerformanceLoggerThread.java
#	naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
This commit is contained in:
nkorange 2019-01-14 16:59:13 +08:00
commit d0bfc0e179
48 changed files with 1129 additions and 125 deletions

View File

@ -109,7 +109,7 @@ public class Constants {
public static final String WORD_SEPARATOR = Character.toString((char)2);
public static final String LONGPULLING_LINE_SEPARATOR = "\r\n";
public static final String LONGPOLLING_LINE_SEPARATOR = "\r\n";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
public static final String CLIENT_REQUEST_TS_HEADER = "Client-RequestTS";

View File

@ -92,7 +92,17 @@ public interface NamingService {
List<Instance> getAllInstances(String serviceName) throws NacosException;
/**
* get all instances within specified clusters of a service
* Get all instances of a service
*
* @param serviceName name of service
* @param subscribe if subscribe the service
* @return A list of instance
* @throws NacosException
*/
List<Instance> getAllInstances(String serviceName, boolean subscribe) throws NacosException;
/**
* Get all instances within specified clusters of a service
*
* @param serviceName name of service
* @param clusters list of cluster
@ -102,7 +112,18 @@ public interface NamingService {
List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;
/**
* get qualified instances of service
* Get all instances within specified clusters of a service
*
* @param serviceName name of service
* @param clusters list of cluster
* @param subscribe if subscribe the service
* @return A list of qualified instance
* @throws NacosException
*/
List<Instance> getAllInstances(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;
/**
* Get qualified instances of service
*
* @param serviceName name of service
* @param healthy a flag to indicate returning healthy or unhealthy instances
@ -112,7 +133,18 @@ public interface NamingService {
List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;
/**
* get qualified instances within specified clusters of service
* Get qualified instances of service
*
* @param serviceName name of service
* @param healthy a flag to indicate returning healthy or unhealthy instances
* @param subscribe if subscribe the service
* @return A qualified list of instance
* @throws NacosException
*/
List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException;
/**
* Get qualified instances within specified clusters of service
*
* @param serviceName name of service
* @param clusters list of cluster
@ -123,7 +155,19 @@ public interface NamingService {
List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException;
/**
* select one healthy instance of service using predefined load balance strategy
* Get qualified instances within specified clusters of service
*
* @param serviceName name of service
* @param clusters list of cluster
* @param healthy a flag to indicate returning healthy or unhealthy instances
* @param subscribe if subscribe the service
* @return A qualified list of instance
* @throws NacosException
*/
List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException;
/**
* Select one healthy instance of service using predefined load balance strategy
*
* @param serviceName name of service
* @return qualified instance
@ -135,6 +179,16 @@ public interface NamingService {
* select one healthy instance of service using predefined load balance strategy
*
* @param serviceName name of service
* @param subscribe if subscribe the service
* @return qualified instance
* @throws NacosException
*/
Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException;
/**
* Select one healthy instance of service using predefined load balance strategy
*
* @param serviceName name of service
* @param clusters a list of clusters should the instance belongs to
* @return qualified instance
* @throws NacosException
@ -142,7 +196,18 @@ public interface NamingService {
Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException;
/**
* subscribe service to receive events of instances alteration
* Select one healthy instance of service using predefined load balance strategy
*
* @param serviceName name of service
* @param clusters a list of clusters should the instance belongs to
* @param subscribe if subscribe the service
* @return qualified instance
* @throws NacosException
*/
Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;
/**
* Subscribe service to receive events of instances alteration
*
* @param serviceName name of service
* @param listener event listener

View File

@ -107,5 +107,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -23,10 +23,12 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.filter.impl.ConfigRequest;
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
import com.alibaba.nacos.client.config.http.HttpAgent;
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.http.ServerHttpAgent;
import com.alibaba.nacos.client.config.impl.ClientWorker;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor;
import com.alibaba.nacos.client.config.impl.ServerHttpAgent;
import com.alibaba.nacos.client.config.utils.ContentUtils;
import com.alibaba.nacos.client.config.utils.LogUtils;
import com.alibaba.nacos.client.config.utils.ParamUtils;
@ -55,9 +57,9 @@ public class NacosConfigService implements ConfigService {
/**
* http agent
*/
private ServerHttpAgent agent;
private HttpAgent agent;
/**
* longpulling
* longpolling
*/
private ClientWorker worker;
private String namespace;
@ -79,7 +81,7 @@ public class NacosConfigService implements ConfigService {
namespace = namespaceTmp;
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
agent = new ServerHttpAgent(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager);
}
@ -131,9 +133,11 @@ public class NacosConfigService implements ConfigService {
try {
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {

View File

@ -0,0 +1,98 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.config.http;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import java.io.IOException;
import java.util.List;
/**
* HttpAgent
*
* @author Nacos
*/
public interface HttpAgent {
/**
* start to get nacos ip list
* @return Nothing.
* @throws NacosException on get ip list error.
*/
void start() throws NacosException;
/**
* invoke http get method
* @param path http path
* @param headers http headers
* @param paramValues http paramValues http
* @param encoding http encode
* @param readTimeoutMs http timeout
* @return HttpResult http response
* @throws IOException If an input or output exception occurred
*/
HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;
/**
* invoke http post method
* @param path http path
* @param headers http headers
* @param paramValues http paramValues http
* @param encoding http encode
* @param readTimeoutMs http timeout
* @return HttpResult http response
* @throws IOException If an input or output exception occurred
*/
HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;
/**
* invoke http delete method
* @param path http path
* @param headers http headers
* @param paramValues http paramValues http
* @param encoding http encode
* @param readTimeoutMs http timeout
* @return HttpResult http response
* @throws IOException If an input or output exception occurred
*/
HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException;
/**
* get name
* @return String
*/
String getName();
/**
* get namespace
* @return String
*/
String getNamespace();
/**
* get tenant
* @return String
*/
String getTenant();
/**
* get encode
* @return String
*/
String getEncode();
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.config.http;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* MetricsHttpAgent
*
* @author Nacos
*/
public class MetricsHttpAgent implements HttpAgent {
private HttpAgent httpAgent;
public MetricsHttpAgent(HttpAgent httpAgent) {
this.httpAgent = httpAgent;
}
@Override
public void start() throws NacosException {
httpAgent.start();
}
@Override
public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
long start = System.currentTimeMillis();
long end = 0;
HttpResult result = null;
try {
result = httpAgent.httpGet(path, headers, paramValues, encoding, readTimeoutMs);
} catch (IOException e) {
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("GET", path, "NA").record(end - start, TimeUnit.MILLISECONDS);
throw e;
}
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("GET", path, String.valueOf(result.code)).record(end - start, TimeUnit.MILLISECONDS);
return result;
}
@Override
public HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
long start = System.currentTimeMillis();
long end = 0;
HttpResult result = null;
try {
result = httpAgent.httpPost(path, headers, paramValues, encoding, readTimeoutMs);
} catch (IOException e) {
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("POST", path, "NA").record(end - start, TimeUnit.MILLISECONDS);
throw e;
}
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("POST", path, String.valueOf(result.code)).record(end - start, TimeUnit.MILLISECONDS);
return result;
}
@Override
public HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
long start = System.currentTimeMillis();
long end = 0;
HttpResult result = null;
try {
result = httpAgent.httpDelete(path, headers, paramValues, encoding, readTimeoutMs);
} catch (IOException e) {
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("DELETE", path, "NA").record(end - start, TimeUnit.MILLISECONDS);
throw e;
}
end = System.currentTimeMillis();
MetricsMonitor.getConfigRequestMonitor("DELETE", path, String.valueOf(result.code)).record(end - start, TimeUnit.MILLISECONDS);
return result;
}
@Override
public String getName() {
return httpAgent.getName();
}
@Override
public String getNamespace() {
return httpAgent.getNamespace();
}
@Override
public String getTenant() {
return httpAgent.getTenant();
}
@Override
public String getEncode() {
return httpAgent.getEncode();
}
}

View File

@ -13,17 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.config.impl;
package com.alibaba.nacos.client.config.http;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.config.impl.ServerListManager;
import com.alibaba.nacos.client.config.impl.SpasAdapter;
import com.alibaba.nacos.client.config.utils.IOUtils;
import com.alibaba.nacos.client.config.utils.LogUtils;
import com.alibaba.nacos.client.identify.STSConfig;
import com.alibaba.nacos.client.logger.Logger;
import com.alibaba.nacos.client.logger.support.LoggerHelper;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.utils.JSONUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.StringUtils;
@ -39,13 +43,14 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* Server Agent
*
* @author water.lyl
*/
public class ServerHttpAgent {
public class ServerHttpAgent implements HttpAgent {
final static public Logger log = LogUtils.logger(ServerHttpAgent.class);
@ -58,6 +63,7 @@ public class ServerHttpAgent {
* @return
* @throws IOException
*/
@Override
public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding,
long readTimeoutMs) throws IOException {
final long endTime = System.currentTimeMillis() + readTimeoutMs;
@ -101,6 +107,7 @@ public class ServerHttpAgent {
throw new ConnectException("no available server");
}
@Override
public HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding,
long readTimeoutMs) throws IOException {
final long endTime = System.currentTimeMillis() + readTimeoutMs;
@ -143,6 +150,7 @@ public class ServerHttpAgent {
throw new ConnectException("no available server");
}
@Override
public HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding,
long readTimeoutMs) throws IOException {
final long endTime = System.currentTimeMillis() + readTimeoutMs;
@ -241,6 +249,7 @@ public class ServerHttpAgent {
}
}
@Override
public synchronized void start() throws NacosException {
serverListMgr.start();
}
@ -324,18 +333,22 @@ public class ServerHttpAgent {
"can not get security credentials, responseCode: " + respCode + ", response: " + response);
}
@Override
public String getName() {
return serverListMgr.getName();
}
@Override
public String getNamespace() {
return serverListMgr.getNamespace();
}
@Override
public String getTenant() {
return serverListMgr.getTenant();
}
@Override
public String getEncode() {
return encode;
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.http.HttpAgent;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.config.utils.ContentUtils;
import com.alibaba.nacos.client.config.utils.LogUtils;
@ -27,6 +28,7 @@ import com.alibaba.nacos.client.config.utils.MD5;
import com.alibaba.nacos.client.config.utils.TenantUtil;
import com.alibaba.nacos.client.logger.Logger;
import com.alibaba.nacos.client.logger.support.LoggerHelper;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.StringUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@ -43,7 +45,7 @@ import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR;
import static com.alibaba.nacos.api.common.Constants.WORD_SEPARATOR;
/**
* Longpulling
* Longpolling
*
* @author Nacos
*/
@ -100,6 +102,8 @@ public class ClientWorker {
cacheMap.set(copy);
}
log.info(agent.getName(), "[unsubscribe] {}", groupKey);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
}
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
@ -111,6 +115,8 @@ public class ClientWorker {
cacheMap.set(copy);
}
log.info(agent.getName(), "[unsubscribe] {}", groupKey);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
}
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
@ -143,6 +149,8 @@ public class ClientWorker {
log.info(agent.getName(), "[subscribe] {}", key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache;
}
@ -170,6 +178,9 @@ public class ClientWorker {
cacheMap.set(copy);
}
log.info(agent.getName(), "[subscribe] {}", key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache;
}
@ -289,7 +300,7 @@ public class ClientWorker {
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想 任务列表现在是无序的变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
@ -406,9 +417,10 @@ public class ClientWorker {
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final ServerHttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -423,7 +435,7 @@ public class ClientWorker {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPulling" + agent.getName());
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
@ -440,10 +452,10 @@ public class ClientWorker {
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
class LongPullingRunnable implements Runnable {
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPullingRunnable(int taskId) {
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@ -498,7 +510,7 @@ public class ClientWorker {
}
inInitializingCacheList.clear();
} catch (Throwable e) {
log.error("500", "longPulling error", e);
log.error("500", "longPolling error", e);
} finally {
executorService.execute(this);
}
@ -522,9 +534,9 @@ public class ClientWorker {
*/
AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
ServerHttpAgent agent;
HttpAgent agent;
ConfigFilterChainManager configFilterChainManager;
private boolean isHealthServer = true;
private double currentLongingTaskCount = 0;
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.monitor;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Metrics Monitor
*
* @author Nacos
*/
public class MetricsMonitor {
private static AtomicInteger serviceInfoMapSize = new AtomicInteger();
private static AtomicInteger dom2BeatSize = new AtomicInteger();
private static AtomicInteger listenConfigCount = new AtomicInteger();
static {
List<Tag> tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "subServiceCount"));
Metrics.gauge("nacos_monitor", tags, serviceInfoMapSize);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "pubServiceCount"));
Metrics.gauge("nacos_monitor", tags, dom2BeatSize);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "listenConfigCount"));
Metrics.gauge("nacos_monitor", tags, listenConfigCount);
}
public static AtomicInteger getServiceInfoMapSizeMonitor() {
return serviceInfoMapSize;
}
public static AtomicInteger getDom2BeatSizeMonitor() {
return dom2BeatSize;
}
public static AtomicInteger getListenConfigCountMonitor() {
return listenConfigCount;
}
public static Timer getConfigRequestMonitor(String method, String url, String code) {
return Metrics.timer("nacos_client_request",
"module", "config",
"method", method,
"url", url,
"code", code);
}
public static Timer getNamingRequestMonitor(String method, String url, String code) {
return Metrics.timer("nacos_client_request",
"module", "naming",
"method", method,
"url", url,
"code", code);
}
}

View File

@ -195,9 +195,24 @@ public class NacosNamingService implements NamingService {
}
@Override
public List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException {
public List<Instance> getAllInstances(String serviceName, boolean subscribe) throws NacosException {
return getAllInstances(serviceName, new ArrayList<String>(), subscribe);
}
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
@Override
public List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException {
return getAllInstances(serviceName, clusters, true);
}
@Override
public List<Instance> getAllInstances(String serviceName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(serviceName, StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
@ -206,40 +221,58 @@ public class NacosNamingService implements NamingService {
}
@Override
public List<Instance> selectInstances(String serviceName, boolean healthyOnly) throws NacosException {
return selectInstances(serviceName, new ArrayList<String>(), healthyOnly);
public List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException {
return selectInstances(serviceName, new ArrayList<String>(), healthy);
}
@Override
public List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException {
return selectInstances(serviceName, new ArrayList<String>(), healthy, subscribe);
}
@Override
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy)
throws NacosException {
ServiceInfo serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
Iterator<Instance> iterator = list.iterator();
while (iterator.hasNext()) {
Instance instance = iterator.next();
if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
iterator.remove();
}
}
return list;
return selectInstances(serviceName, clusters, healthy, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName) {
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(serviceName, StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
@Override
public Instance selectOneHealthyInstance(String serviceName) throws NacosException {
return selectOneHealthyInstance(serviceName, new ArrayList<String>());
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) {
public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {
return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {
return selectOneHealthyInstance(serviceName, clusters, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe) throws NacosException {
if (subscribe) {
return Balancer.RandomByWeight.selectHost(
hostReactor.getServiceInfo(serviceName, StringUtils.join(clusters, ",")));
} else {
return Balancer.RandomByWeight.selectHost(
hostReactor.getServiceInfoDirectlyFromServer(serviceName, StringUtils.join(clusters, ",")));
}
}
@Override
@ -284,6 +317,23 @@ public class NacosNamingService implements NamingService {
return serverProxy.serverHealthy() ? "UP" : "DOWN";
}
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
Iterator<Instance> iterator = list.iterator();
while (iterator.hasNext()) {
Instance instance = iterator.next();
if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
iterator.remove();
}
}
return list;
}
public BeatReactor getBeatReactor() {
return beatReactor;
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.client.naming.beat;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
@ -59,11 +60,13 @@ public class BeatReactor {
public void addBeatInfo(String dom, BeatInfo beatInfo) {
LogUtils.LOG.info("BEAT", "adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(dom, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public void removeBeatInfo(String dom, String ip, int port) {
LogUtils.LOG.info("BEAT", "removing beat: {}:{}:{} from beat map.", dom, ip, port);
dom2Beat.remove(buildKey(dom, ip, port));
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public String buildKey(String dom, String ip, int port) {

View File

@ -16,8 +16,10 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.net.NamingProxy;
@ -44,7 +46,7 @@ public class HostReactor {
private Map<String, Object> updatingMap;
private PushRecver pushRecver;
private PushReceiver pushReceiver;
private EventDispatcher eventDispatcher;
@ -84,7 +86,7 @@ public class HostReactor {
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushRecver = new PushRecver(this);
this.pushReceiver = new PushReceiver(this);
}
public Map<String, ServiceInfo> getServiceInfoMap() {
@ -188,6 +190,8 @@ public class HostReactor {
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
LogUtils.LOG.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() +
" -> " + JSON.toJSONString(serviceInfo.getHosts()));
@ -201,6 +205,14 @@ public class HostReactor {
return serviceInfoMap.get(key);
}
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {
String result = serverProxy.queryList(serviceName, clusters, 0, false);
if (StringUtils.isNotEmpty(result)) {
return JSON.parseObject(result, ServiceInfo.class);
}
return null;
}
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
LogUtils.LOG.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
@ -260,7 +272,7 @@ public class HostReactor {
ServiceInfo oldService = getSerivceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushRecver.getUDPPort(), false);
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
@ -277,7 +289,7 @@ public class HostReactor {
public void refreshOnly(String serviceName, String clusters) {
try {
serverProxy.queryList(serviceName, clusters, pushRecver.getUDPPort(), false);
serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
} catch (Exception e) {
LogUtils.LOG.error("NA", "failed to update serviceName: " + serviceName, e);
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.ThreadFactory;
/**
* @author xuanyin
*/
public class PushRecver implements Runnable {
public class PushReceiver implements Runnable {
private ScheduledExecutorService executorService;
@ -40,7 +40,7 @@ public class PushRecver implements Runnable {
private HostReactor hostReactor;
public PushRecver(HostReactor hostReactor) {
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.api.selector.ExpressionSelector;
import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.utils.*;
import com.alibaba.nacos.common.util.HttpMethod;
@ -295,6 +296,8 @@ public class NamingProxy {
public String callServer(String api, Map<String, String> params, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
List<String> headers = Arrays.asList("Client-Version", UtilAndComs.VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
@ -310,6 +313,10 @@ public class NamingProxy {
url = HttpClient.getPrefix() + curServer + api;
HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.record(end - start, TimeUnit.MILLISECONDS);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;

View File

@ -126,6 +126,26 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-elastic</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.config.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.UnknownHostException;
@ -25,6 +26,7 @@ import java.net.UnknownHostException;
*
* @author Nacos
*/
@EnableScheduling
@SpringBootApplication
public class Config {

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.aspect;
import com.alibaba.nacos.config.server.service.ConfigService;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.MD5;
@ -23,6 +24,7 @@ import com.alibaba.nacos.config.server.utils.RequestUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -33,8 +35,8 @@ import javax.servlet.http.HttpServletResponse;
* @author Nacos
*/
@Aspect
@Component
public class RequestLogAspect {
/**
* publish config
*/
@ -54,22 +56,24 @@ public class RequestLogAspect {
*/
private static final String CLIENT_INTERFACE_REMOVE_ALL_CONFIG
= "execution(* com.alibaba.nacos.config.server.controller.ConfigController.deleteConfig(..)) && args(request,"
+ "response,dataId,group,..)";
+ "response,dataId,group,tenant,..)";
/**
* publishSingle
*/
* */
@Around(CLIENT_INTERFACE_PUBLISH_SINGLE_CONFIG)
public Object interfacePublishSingle(ProceedingJoinPoint pjp, HttpServletRequest request,
HttpServletResponse response, String dataId, String group, String tenant,
String content) throws Throwable {
final String md5 = content == null ? null : MD5.getInstance().getMD5String(content);
MetricsMonitor.getPublishMonitor().incrementAndGet();
return logClientRequest("publish", pjp, request, response, dataId, group, tenant, md5);
}
/**
* removeAll
*/
* */
@Around(CLIENT_INTERFACE_REMOVE_ALL_CONFIG)
public Object interfaceRemoveAll(ProceedingJoinPoint pjp, HttpServletRequest request, HttpServletResponse response,
String dataId, String group, String tenant) throws Throwable {
@ -84,6 +88,7 @@ public class RequestLogAspect {
String dataId, String group, String tenant) throws Throwable {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final String md5 = ConfigService.getContentMd5(groupKey);
MetricsMonitor.getConfigMonitor().incrementAndGet();
return logClientRequest("get", pjp, request, response, dataId, group, tenant, md5);
}

View File

@ -60,7 +60,7 @@ public class ConfigServletInner {
private static final int TRY_GET_LOCK_TIMES = 9;
private static final int START_LONGPULLING_VERSION_NUM = 204;
private static final int START_LONGPOLLING_VERSION_NUM = 204;
/**
* 轮询接口
@ -70,8 +70,8 @@ public class ConfigServletInner {
throws IOException, ServletException {
// 长轮询
if (LongPollingService.isSupportLongPulling(request)) {
longPollingService.addLongPullingClient(request, response, clientMd5Map, probeRequestSize);
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
@ -91,7 +91,7 @@ public class ConfigServletInner {
/**
* 2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONGPULLING_VERSION_NUM) {
if (versionNum < START_LONGPOLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {

View File

@ -35,7 +35,7 @@ import java.util.HashMap;
import java.util.Map;
/**
* Config longpulling
* Config longpolling
*
* @author Nacos
*/

View File

@ -15,6 +15,9 @@
*/
package com.alibaba.nacos.config.server.exception;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
@ -37,6 +40,7 @@ public class GlobalExceptionHandler {
*/
@ExceptionHandler(IllegalArgumentException.class)
public void handleIllegalArgumentException(HttpServletResponse response, Exception ex) throws IOException {
MetricsMonitor.getIllegalArgumentException().increment();
response.setStatus(400);
if (ex.getMessage() != null) {
response.getWriter().println(ex.getMessage());
@ -52,6 +56,7 @@ public class GlobalExceptionHandler {
*/
@ExceptionHandler(NacosException.class)
public void handleNacosException(HttpServletResponse response, NacosException ex) throws IOException {
MetricsMonitor.getNacosException().increment();
response.setStatus(ex.getErrCode());
if (ex.getErrMsg() != null) {
response.getWriter().println(ex.getErrMsg());
@ -60,4 +65,15 @@ public class GlobalExceptionHandler {
}
}
/**
* For DataAccessException
*
* @throws DataAccessException
*/
@ExceptionHandler(DataAccessException.class)
public void handleDataAccessException(HttpServletResponse response, DataAccessException ex) throws DataAccessException {
MetricsMonitor.getDbException().increment();
throw new CannotGetJdbcConnectionException(ex.getMessage());
}
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.manager;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import org.slf4j.Logger;
@ -51,6 +52,7 @@ public final class TaskManager implements TaskManagerMBean {
private String name;
class ProcessRunnable implements Runnable {
public void run() {
@ -140,6 +142,7 @@ public final class TaskManager implements TaskManagerMBean {
this.lock.lock();
try {
this.tasks.remove(type);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
} finally {
this.lock.unlock();
}
@ -150,12 +153,12 @@ public final class TaskManager implements TaskManagerMBean {
*
* @param type
* @param task
* @param previousTask
*/
public void addTask(String type, AbstractTask task) {
this.lock.lock();
try {
AbstractTask oldTask = tasks.put(type, task);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
if (null != oldTask) {
task.merge(oldTask);
}
@ -181,6 +184,7 @@ public final class TaskManager implements TaskManagerMBean {
}
// 先将任务从任务Map中删除
this.tasks.remove(entry.getKey());
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
} finally {
this.lock.unlock();

View File

@ -20,10 +20,12 @@ import com.alibaba.nacos.config.server.service.ConfigService;
import com.alibaba.nacos.config.server.service.TimerTaskService;
import com.alibaba.nacos.config.server.service.notify.AsyncNotifyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
@ -34,6 +36,7 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
*/
@Service
public class MemoryMonitor {
@Autowired
public MemoryMonitor(AsyncNotifyService notifySingleService) {
@ -45,9 +48,16 @@ public class MemoryMonitor {
TimerTaskService.scheduleWithFixedDelay(new NotifyTaskQueueMonitorTask(notifySingleService), DELAY_SECONDS,
DELAY_SECONDS, TimeUnit.SECONDS);
}
static final long DELAY_SECONDS = 10;
@Scheduled(cron = "0 0 0 * * ?")
public void clear() {
MetricsMonitor.getConfigMonitor().set(0);
MetricsMonitor.getPublishMonitor().set(0);
}
}
class PrintGetConfigResponeTask implements Runnable {
@ -58,6 +68,7 @@ class PrintGetConfigResponeTask implements Runnable {
}
class PrintMemoryTask implements Runnable {
@Override
public void run() {
int groupCount = ConfigService.groupCount();
@ -65,11 +76,14 @@ class PrintMemoryTask implements Runnable {
long subCount = ClientTrackService.subscriberCount();
memoryLog.info("groupCount={}, subscriberClientCount={}, subscriberCount={}", groupCount, subClientCount,
subCount);
MetricsMonitor.getConfigCountMonitor().set(groupCount);
}
}
class NotifyTaskQueueMonitorTask implements Runnable {
final private AsyncNotifyService notifySingleService;
private AtomicInteger notifyTask = new AtomicInteger();
NotifyTaskQueueMonitorTask(AsyncNotifyService notifySingleService) {
this.notifySingleService = notifySingleService;
@ -77,16 +91,10 @@ class NotifyTaskQueueMonitorTask implements Runnable {
@Override
public void run() {
int size = ((ScheduledThreadPoolExecutor)notifySingleService.getExecutor()).getQueue().size();
memoryLog.info("notifySingleServiceThreadPool-{}, toNotifyTaskSize={}",
new Object[] {((ScheduledThreadPoolExecutor)notifySingleService.getExecutor()).getClass().getName(),
((ScheduledThreadPoolExecutor)notifySingleService.getExecutor()).getQueue().size()});
// for(Map.Entry<String, Executor> entry: notifySingleService.getExecutors().entrySet()) {
// ThreadPoolExecutor pool = (ThreadPoolExecutor) entry.getValue();
// String target = entry.getKey();
// memoryLog.info("notifySingleServiceThreadPool-{}, toNotifyTaskSize={}",
// new Object[] { target, pool.getQueue().size() });
// }
size});
MetricsMonitor.getNotifyTaskMonitor().set(size);
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor;
import io.micrometer.core.instrument.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Metrics Monitor
*
* @author Nacos
*/
public class MetricsMonitor {
private static AtomicInteger getConfig = new AtomicInteger();
private static AtomicInteger publish = new AtomicInteger();
private static AtomicInteger longPolling = new AtomicInteger();
private static AtomicInteger configCount = new AtomicInteger();
private static AtomicInteger notifyTask = new AtomicInteger();
private static AtomicInteger dumpTask = new AtomicInteger();
static {
List<Tag> tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "getConfig"));
Metrics.gauge("nacos_monitor", tags, getConfig);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "publish"));
Metrics.gauge("nacos_monitor", tags, publish);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "longPolling"));
Metrics.gauge("nacos_monitor", tags, longPolling);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "configCount"));
Metrics.gauge("nacos_monitor", tags, configCount);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "notifyTask"));
Metrics.gauge("nacos_monitor", tags, notifyTask);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "config"));
tags.add(new ImmutableTag("name", "dumpTask"));
Metrics.gauge("nacos_monitor", tags, dumpTask);
}
public static AtomicInteger getConfigMonitor() {
return getConfig;
}
public static AtomicInteger getPublishMonitor() {
return publish;
}
public static AtomicInteger getLongPollingMonitor() {
return longPolling;
}
public static AtomicInteger getConfigCountMonitor() {
return configCount;
}
public static AtomicInteger getNotifyTaskMonitor() {
return notifyTask;
}
public static AtomicInteger getDumpTaskMonitor() {
return dumpTask;
}
public static Timer getNotifyRtTimer() {
return Metrics.timer("nacos_timer",
"module", "config", "name", "notifyRt");
}
public static Counter getIllegalArgumentException() {
return Metrics.counter("nacos_exception",
"module", "config", "name", "illegalArgument");
}
public static Counter getNacosException() {
return Metrics.counter("nacos_exception",
"module", "config", "name", "nacos");
}
public static Counter getDbException() {
return Metrics.counter("nacos_exception",
"module", "config", "name", "db");
}
public static Counter getConfigNotifyException() {
return Metrics.counter("nacos_exception",
"module", "config", "name", "configNotify");
}
public static Counter getUnhealthException() {
return Metrics.counter("nacos_exception",
"module", "config", "name", "unhealth");
}
}

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -309,6 +310,7 @@ public class BasicDataSourceServiceImpl implements DataSourceService {
if (!isFound) {
fatalLog.error("[master-db] master db not found.");
MetricsMonitor.getDbException().increment();
}
}
}
@ -332,6 +334,8 @@ public class BasicDataSourceServiceImpl implements DataSourceService {
fatalLog.error("[db-error] slave db {} down.", getIpFromUrl(dataSourceList.get(i).getUrl()));
}
isHealthList.set(i, Boolean.FALSE);
MetricsMonitor.getDbException().increment();
}
}
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.MD5Util;
@ -65,7 +66,7 @@ public class LongPollingService extends AbstractEventListener {
}
public Map<String, String> getClientSubConfigInfo(String clientIp) {
ClientLongPulling record = getClientPollingRecord(clientIp);
ClientLongPolling record = getClientPollingRecord(clientIp);
if (record == null) {
return Collections.<String, String>emptyMap();
@ -79,9 +80,9 @@ public class LongPollingService extends AbstractEventListener {
SampleResult sampleResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);
for (ClientLongPulling clientLongPulling : allSubs) {
if (clientLongPulling.clientMd5Map.containsKey(groupKey)) {
lisentersGroupkeyStatus.put(clientLongPulling.ip, clientLongPulling.clientMd5Map.get(groupKey));
for (ClientLongPolling clientLongPolling : allSubs) {
if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {
lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));
}
}
sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);
@ -92,11 +93,11 @@ public class LongPollingService extends AbstractEventListener {
SampleResult sampleResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);
for (ClientLongPulling clientLongPulling : allSubs) {
if (clientLongPulling.ip.equals(clientIp)) {
for (ClientLongPolling clientLongPolling : allSubs) {
if (clientLongPolling.ip.equals(clientIp)) {
// 一个ip可能有多个监听
if (!lisentersGroupkeyStatus.equals(clientLongPulling.clientMd5Map)) {
lisentersGroupkeyStatus.putAll(clientLongPulling.clientMd5Map);
if (!lisentersGroupkeyStatus.equals(clientLongPolling.clientMd5Map)) {
lisentersGroupkeyStatus.putAll(clientLongPolling.clientMd5Map);
}
}
}
@ -128,18 +129,18 @@ public class LongPollingService extends AbstractEventListener {
return null;
}
HashMap<String, Set<String>> app2Groupkeys = new HashMap<String, Set<String>>(50);
for (ClientLongPulling clientLongPulling : allSubs) {
if (StringUtils.isEmpty(clientLongPulling.appName) || "unknown".equalsIgnoreCase(
clientLongPulling.appName)) {
for (ClientLongPolling clientLongPolling : allSubs) {
if (StringUtils.isEmpty(clientLongPolling.appName) || "unknown".equalsIgnoreCase(
clientLongPolling.appName)) {
continue;
}
Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPulling.appName);
Set<String> clientSubscribeConfigs = clientLongPulling.clientMd5Map.keySet();
Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPolling.appName);
Set<String> clientSubscribeConfigs = clientLongPolling.clientMd5Map.keySet();
if (appSubscribeConfigs == null) {
appSubscribeConfigs = new HashSet<String>(clientSubscribeConfigs.size());
}
appSubscribeConfigs.addAll(clientSubscribeConfigs);
app2Groupkeys.put(clientLongPulling.appName, appSubscribeConfigs);
app2Groupkeys.put(clientLongPolling.appName, appSubscribeConfigs);
}
return app2Groupkeys;
@ -187,27 +188,27 @@ public class LongPollingService extends AbstractEventListener {
return sampleResult;
}
private ClientLongPulling getClientPollingRecord(String clientIp) {
private ClientLongPolling getClientPollingRecord(String clientIp) {
if (allSubs == null) {
return null;
}
for (ClientLongPulling clientLongPulling : allSubs) {
HttpServletRequest request = (HttpServletRequest)clientLongPulling.asyncContext.getRequest();
for (ClientLongPolling clientLongPolling : allSubs) {
HttpServletRequest request = (HttpServletRequest) clientLongPolling.asyncContext.getRequest();
if (clientIp.equals(RequestUtil.getRemoteIp(request))) {
return clientLongPulling;
return clientLongPolling;
}
}
return null;
}
public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_PULLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_PULLING_NO_HANG_UP_HEADER);
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
@ -241,7 +242,7 @@ public class LongPollingService extends AbstractEventListener {
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPulling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
@Override
@ -263,20 +264,20 @@ public class LongPollingService extends AbstractEventListener {
}
}
static public boolean isSupportLongPulling(HttpServletRequest req) {
return null != req.getHeader(LONG_PULLING_HEADER);
static public boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPulling>();
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.LongPulling");
t.setName("com.alibaba.nacos.LongPolling");
return t;
}
});
@ -285,15 +286,15 @@ public class LongPollingService extends AbstractEventListener {
// =================
static public final String LONG_PULLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_PULLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
static public final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
final ScheduledExecutorService scheduler;
/**
* 长轮询订阅关系
*/
final Queue<ClientLongPulling> allSubs;
final Queue<ClientLongPolling> allSubs;
// =================
@ -302,8 +303,8 @@ public class LongPollingService extends AbstractEventListener {
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPulling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPulling clientSub = iter.next();
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
@ -359,23 +360,24 @@ public class LongPollingService extends AbstractEventListener {
@Override
public void run() {
memoryLog.info("[long-pulling] client count " + allSubs.size());
MetricsMonitor.getLongPollingMonitor().set(allSubs.size());
}
}
// =================
class ClientLongPulling implements Runnable {
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
public void run() {
try {
getRetainIps().put(ClientLongPulling.this.ip, System.currentTimeMillis());
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPulling.this);
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
@ -400,7 +402,7 @@ public class LongPollingService extends AbstractEventListener {
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long pulling error:" + t.getMessage(), t.getCause());
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
@ -446,7 +448,7 @@ public class LongPollingService extends AbstractEventListener {
}
}
ClientLongPulling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,
ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,
long timeoutTime, String appName, String tag) {
this.asyncContext = ac;
this.clientMd5Map = clientMd5Map;

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
import com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult;
import com.alibaba.nacos.config.server.utils.LogUtil;
@ -377,6 +378,7 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
serverListUnhealth.add(serverIp);
}
defaultLog.error("unhealthIp:{}, unhealthCount:{}", serverIp, failCount);
MetricsMonitor.getUnhealthException().increment();
}
}
@ -391,6 +393,7 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
serverListUnhealth.add(serverIp);
}
defaultLog.error("unhealthIp:{}, unhealthCount:{}", serverIp, failCount);
MetricsMonitor.getUnhealthException().increment();
}
}
}

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.ServerListService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.*;
@ -192,6 +193,7 @@ public class AsyncNotifyService extends AbstractEventListener {
ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);
//get delay time and set fail count to the task
int delay = getDelayTime(task);
@ -207,6 +209,7 @@ public class AsyncNotifyService extends AbstractEventListener {
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}
HttpClientUtils.closeQuietly(response);
}
@ -238,6 +241,7 @@ public class AsyncNotifyService extends AbstractEventListener {
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}
@Override
@ -262,6 +266,7 @@ public class AsyncNotifyService extends AbstractEventListener {
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}
private NotifySingleTask task;

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.manager.AbstractTask;
import com.alibaba.nacos.config.server.manager.TaskProcessor;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.ServerListService;
import com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.common.util.SystemUtils.LOCAL_IP;
@ -76,8 +78,12 @@ public class NotifyTaskProcessor implements TaskProcessor {
if (result.code == HttpStatus.SC_OK) {
ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_OK, delayed, serverIp);
MetricsMonitor.getNotifyRtTimer().record(delayed, TimeUnit.MILLISECONDS);
return true;
} else {
MetricsMonitor.getConfigNotifyException().increment();
log.error("[notify-error] {}, {}, to {}, result {}", new Object[] {dataId, group,
serverIp, result.code});
ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, LOCAL_IP,
@ -85,6 +91,7 @@ public class NotifyTaskProcessor implements TaskProcessor {
return false;
}
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
log.error(
"[notify-exception] " + dataId + ", " + group + ", to " + serverIp + ", "
+ e.toString());

View File

@ -15,11 +15,14 @@
*/
package com.alibaba.nacos.config.server.service.trace;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.MD5;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.common.util.SystemUtils.LOCAL_IP;
/**
@ -68,6 +71,7 @@ public class ConfigTraceService {
if (!LogUtil.traceLog.isInfoEnabled()) {
return;
}
MetricsMonitor.getNotifyRtTimer().record(delayed, TimeUnit.MILLISECONDS);
// 方便tlog切分
if (StringUtils.isBlank(tenant)) {
tenant = null;

View File

@ -19,12 +19,14 @@ package com.alibaba.nacos;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author nacos
*/
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {

View File

@ -9,6 +9,23 @@ nacos.cmdb.eventTaskInterval=10
nacos.cmdb.labelTaskInterval=300
nacos.cmdb.loadDataAtStart=false
# metrics for prometheus
#management.endpoints.web.exposure.include=*
# metrics for elastic search
management.metrics.export.elastic.enabled=false
#management.metrics.export.elastic.host=http://localhost:9200
# metrics for influx
management.metrics.export.influx.enabled=false
#management.metrics.export.influx.db=springboot
#management.metrics.export.influx.uri=http://localhost:8086
#management.metrics.export.influx.auto-create-db=true
#management.metrics.export.influx.consistency=one
#management.metrics.export.influx.compressed=true
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
server.tomcat.basedir=/home/admin/nacos
# default current work dir
server.tomcat.basedir=

View File

@ -158,7 +158,22 @@
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-cmdb</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-elastic</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -17,12 +17,14 @@ package com.alibaba.nacos.naming;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Hello world!
*
* @author xxc
*/
@EnableScheduling
@SpringBootApplication
public class NamingApp {

View File

@ -22,6 +22,7 @@ import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response;
import org.apache.commons.collections.CollectionUtils;
@ -544,6 +545,7 @@ public class RaftCore {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
@ -555,10 +557,12 @@ public class RaftCore {
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("VIPSRV error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.consistency.cp.simpleraft;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -154,6 +155,8 @@ public class RaftStore {
}
if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
MetricsMonitor.getDiskException().increment();
throw new IllegalStateException("can not make cache file: " + cacheFile.getName());
}
@ -164,6 +167,9 @@ public class RaftStore {
fc = new FileOutputStream(cacheFile, false).getChannel();
fc.write(data, data.position());
fc.force(true);
} catch (Exception e) {
MetricsMonitor.getDiskException().increment();
throw e;
} finally {
if (fc != null) {
fc.close();

View File

@ -15,16 +15,89 @@
*/
package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.DomainsManager;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.ApiCommands;
import com.alibaba.nacos.naming.push.PushService;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:zpf.073@gmail.com">nkorange</a>
* @author nanamikon
* @since 0.8.0
*/
@RestController("namingHealthController")
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_HEALTH_CONTEXT)
public class HealthController extends ApiCommands {
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/health")
public class HealthController {
@Autowired
private DomainsManager domainsManager;
@RequestMapping(method = {RequestMethod.POST, RequestMethod.PUT})
public String update(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, Constants.REQUEST_PARAM_NAMESPACE_ID,
UtilsAndCommons.getDefaultNamespaceId());
String dom = WebUtils.required(request, "serviceName");
String ip = WebUtils.required(request, "ip");
int port = Integer.parseInt(WebUtils.required(request, "port"));
boolean valid = Boolean.valueOf(WebUtils.required(request, "valid"));
String clusterName = WebUtils.optional(request, "clusterName", UtilsAndCommons.DEFAULT_CLUSTER_NAME);
if (!DistroMapper.responsible(dom)) {
String server = DistroMapper.mapSrv(dom);
Loggers.EVT_LOG.info("I'm not responsible for " + dom + ", proxy it to " + server);
Map<String, String> proxyParams = new HashMap<>(16);
for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue()[0];
proxyParams.put(key, value);
}
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
String url = "http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/health";
HttpClient.HttpResult httpResult = HttpClient.httpPost(url, null, proxyParams);
if (httpResult.code != HttpURLConnection.HTTP_OK) {
throw new IllegalArgumentException("failed to proxy health update to " + server + ", dom: " + dom);
}
} else {
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
// Only health check "none" need update health status with api
if (!virtualClusterDomain.getEnableHealthCheck() && !virtualClusterDomain.getEnableClientBeat()) {
for (IpAddress ipAddress : virtualClusterDomain.allIPs(Lists.newArrayList(clusterName))) {
if (ipAddress.getIp().equals(ip) && ipAddress.getPort() == port) {
ipAddress.setValid(valid);
Loggers.EVT_LOG.info((valid ? "[IP-ENABLED]" : "[IP-DISABLED]") + " ips: "
+ ipAddress.getIp() + ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName()
+ ", dom: " + dom + ", msg: update thought HealthController api");
PushService.domChanged(namespaceId, virtualClusterDomain.getName());
break;
}
}
} else {
throw new IllegalArgumentException("health check mode 'client' and 'server' are not supported , dom: " + dom);
}
}
return "ok";
}
}

View File

@ -271,7 +271,7 @@ public class ServiceController {
continue;
}
for (IpAddress address : serviceObj.allIPs()) {
if (value.equals(address.getMetadata().get(key))) {
if (address.getMetadata() != null && value.equals(address.getMetadata().get(key))) {
filteredServices.add(service);
break;
}

View File

@ -300,6 +300,9 @@ public class ServiceManager implements DataListener {
}
public List<String> getAllDomNamesList(String namespaceId) {
if (chooseDomMap(namespaceId) == null) {
return new ArrayList<>();
}
return new ArrayList<>(chooseDomMap(namespaceId).keySet());
}

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.exception;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.MissingServletRequestParameterException;
@ -29,22 +30,26 @@ public class ResponseExceptionHandler {
@ExceptionHandler(NacosException.class)
private ResponseEntity<String> handleNacosException(NacosException e) {
Loggers.SRV_LOG.error("got exception.", e);
return ResponseEntity.status(e.getErrorCode()).body(e.getMessage());
}
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleParameterError(IllegalArgumentException ex) {
Loggers.SRV_LOG.error("got exception.", ex);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(MissingServletRequestParameterException.class)
public ResponseEntity<String> handleMissingParams(MissingServletRequestParameterException ex) {
Loggers.SRV_LOG.error("got exception.", ex);
String name = ex.getParameterName();
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Parameter '" + name + "' is missing");
}
@ExceptionHandler(Exception.class)
private ResponseEntity<String> handleNacosException(Exception e) {
Loggers.SRV_LOG.error("got exception.", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.toString());
}
}

View File

@ -133,6 +133,7 @@ public class HttpHealthCheckProcessor implements HealthCheckProcessor {
}
builder.execute(new HttpHealthCheckCallback(ip, task));
MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
} catch (Throwable e) {
ip.setCheckRT(switchDomain.getHttpHealthParams().getMax());
healthCheckCommon.checkFail(ip, task, "http:error:" + e.getMessage());

View File

@ -120,6 +120,7 @@ public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
}
EXECUTOR.execute(new MysqlCheckTask(ip, task));
MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet();
} catch (Exception e) {
ip.setCheckRT(switchDomain.getMysqlHealthParams().getMax());
healthCheckCommon.checkFail(ip, task, "mysql:error:" + e.getMessage());

View File

@ -19,6 +19,8 @@ import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Switch;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -138,6 +140,7 @@ public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
Beat beat = new Beat(ip, task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
// selector.wakeup();

View File

@ -0,0 +1,146 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.monitor;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Metrics Monitor
*
* @author Nacos
*/
public class MetricsMonitor {
private static AtomicInteger mysqlHealthCheck = new AtomicInteger();
private static AtomicInteger httpHealthCheck = new AtomicInteger();
private static AtomicInteger tcpHealthCheck = new AtomicInteger();
private static AtomicInteger domCount = new AtomicInteger();
private static AtomicInteger ipCount = new AtomicInteger();
private static AtomicLong maxPushCost = new AtomicLong();
private static AtomicLong avgPushCost = new AtomicLong();
private static AtomicLong leaderStatus = new AtomicLong();
private static AtomicInteger totalPush = new AtomicInteger();
private static AtomicInteger failedPush = new AtomicInteger();
static {
List<Tag> tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "mysqlhealthCheck"));
Metrics.gauge("nacos_monitor", tags, mysqlHealthCheck);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "httpHealthCheck"));
Metrics.gauge("nacos_monitor", tags, httpHealthCheck);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "tcpHealthCheck"));
Metrics.gauge("nacos_monitor", tags, tcpHealthCheck);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "domCount"));
Metrics.gauge("nacos_monitor", tags, domCount);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "ipCount"));
Metrics.gauge("nacos_monitor", tags, ipCount);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "maxPushCost"));
Metrics.gauge("nacos_monitor", tags, maxPushCost);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "avgPushCost"));
Metrics.gauge("nacos_monitor", tags, avgPushCost);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "leaderStatus"));
Metrics.gauge("nacos_monitor", tags, leaderStatus);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "totalPush"));
Metrics.gauge("nacos_monitor", tags, totalPush);
tags = new ArrayList<Tag>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", "failedPush"));
Metrics.gauge("nacos_monitor", tags, failedPush);
}
public static AtomicInteger getMysqlHealthCheckMonitor() {
return mysqlHealthCheck;
}
public static AtomicInteger getHttpHealthCheckMonitor() {
return httpHealthCheck;
}
public static AtomicInteger getTcpHealthCheckMonitor() {
return tcpHealthCheck;
}
public static AtomicInteger getDomCountMonitor() {
return domCount;
}
public static AtomicInteger getIpCountMonitor() {
return ipCount;
}
public static AtomicLong getMaxPushCostMonitor() {
return maxPushCost;
}
public static AtomicLong getAvgPushCostMonitor() {
return avgPushCost;
}
public static AtomicLong getLeaderStatusMonitor() {
return leaderStatus;
}
public static AtomicInteger getTotalPushMonitor() {
return totalPush;
}
public static AtomicInteger getFailedPushMonitor() {
return failedPush;
}
public static Counter getDiskException() {
return Metrics.counter("nacos_exception",
"module", "naming", "name", "disk");
}
public static Counter getLeaderSendBeatFailedException() {
return Metrics.counter("nacos_exception",
"module", "naming", "name", "leaderSendBeatFailed");
}
}

View File

@ -21,14 +21,20 @@ import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.naming.raft.RaftPeer.State.FOLLOWER;
/**
* @author nacos
*/
@ -52,7 +58,7 @@ public class PerformanceLoggerThread {
}
});
private static final long PERIOD = 1 * 60 * 60;
private static final long PERIOD = 5 * 60;
private static final long HEALTH_CHECK_PERIOD = 5 * 60;
@PostConstruct
@ -80,6 +86,42 @@ public class PerformanceLoggerThread {
PerformanceLogTask task = new PerformanceLogTask();
executor.scheduleWithFixedDelay(task, 30, PERIOD, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new HealthCheckSwitchTask(), 30, HEALTH_CHECK_PERIOD, TimeUnit.SECONDS);
}
@Scheduled(cron = "0 0 0 * * ?")
public void refreshMetrics() {
PushService.setFailedPush(0);
PushService.setTotalPush(0);
MetricsMonitor.getHttpHealthCheckMonitor().set(0);
MetricsMonitor.getMysqlHealthCheckMonitor().set(0);
MetricsMonitor.getTcpHealthCheckMonitor().set(0);
}
@Scheduled(cron = "0/15 * * * * ?")
public void collectmetrics() {
int domCount = domainsManager.getDomCount();
MetricsMonitor.getDomCountMonitor().set(domCount);
int ipCount = domainsManager.getInstanceCount();
MetricsMonitor.getIpCountMonitor().set(ipCount);
long maxPushCost = getMaxPushCost();
MetricsMonitor.getMaxPushCostMonitor().set(maxPushCost);
long avgPushCost = getAvgPushCost();
MetricsMonitor.getAvgPushCostMonitor().set(avgPushCost);
MetricsMonitor.getTotalPushMonitor().set(PushService.getTotalPush());
MetricsMonitor.getFailedPushMonitor().set(PushService.getFailedPushCount());
if (RaftCore.isLeader()) {
MetricsMonitor.getLeaderStatusMonitor().set(1);
} else if (RaftCore.getPeerSet().local().state == FOLLOWER) {
MetricsMonitor.getLeaderStatusMonitor().set(0);
} else {
MetricsMonitor.getLeaderStatusMonitor().set(2);
}
}
class PerformanceLogTask implements Runnable {
@ -90,8 +132,12 @@ public class PerformanceLoggerThread {
int domCount = serviceManager.getDomCount();
int ipCount = serviceManager.getInstanceCount();
long maxPushMaxCost = getMaxPushCost();
int domCount = domainsManager.getDomCount();
int ipCount = domainsManager.getInstanceCount();
long maxPushCost = getMaxPushCost();
long avgPushCost = getAvgPushCost();
Loggers.PERFORMANCE_LOG.info("[PERFORMANCE] " + "|" + domCount + "|" + ipCount + "|" + maxPushMaxCost + "|" + avgPushCost);
Loggers.PERFORMANCE_LOG.info("PERFORMANCE:" + "|" + domCount + "|" + ipCount + "|" + maxPushCost + "|" + avgPushCost);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[PERFORMANCE] Exception while print performance log.", e);
}

View File

@ -126,6 +126,11 @@ public class PushService {
}
public void addClient(String namespaceId,
public static void setTotalPush(int totalPush) {
PushService.totalPush = totalPush;
}
public static void addClient(String namespaceId,
String dom,
String clusters,
String agent,
@ -306,6 +311,11 @@ public class PushService {
return ackMap.size() + failedPush;
}
public static void setFailedPush(int failedPush) {
PushService.failedPush = failedPush;
}
public static void resetPushState() {
ackMap.clear();
}

View File

@ -481,7 +481,7 @@
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.0.5.RELEASE</version>
<version>2.1.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

View File

@ -18,16 +18,14 @@ package com.alibaba.nacos.test.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.http.ServerHttpAgent;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor;
import com.alibaba.nacos.client.config.impl.ServerHttpAgent;
import com.alibaba.nacos.client.logger.json.JSONObject;
import com.alibaba.nacos.client.utils.StringUtils;
import com.alibaba.nacos.client.config.http.HttpAgent;
import com.alibaba.nacos.config.server.Config;
import org.junit.After;
import org.junit.Assert;
@ -40,7 +38,6 @@ import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.List;
@ -57,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ConfigAPI_ITCase {
public static final long TIME_OUT = 2000;
public ConfigService iconfig = null;
ServerHttpAgent agent = null;
HttpAgent agent = null;
static final String CONFIG_CONTROLLER_PATH = "/v1/cs/configs";
String SPECIAL_CHARACTERS = "!@#$%^&*()_+-=_|/'?.";
@ -76,7 +73,7 @@ public class ConfigAPI_ITCase {
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port);
iconfig = NacosFactory.createConfigService(properties);
agent = new ServerHttpAgent(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
}

View File

@ -394,7 +394,7 @@ public class SelectInstances_ITCase {
TimeUnit.SECONDS.sleep(10);
expressionSelector.setExpression("INSTANCE.metadata.registerSource = 'spring'");
expressionSelector.setExpression("INSTANCE.label.registerSource = 'spring'");
serviceList = naming.getServicesOfServer(1, 10, expressionSelector);
Assert.assertTrue(serviceList.getData().contains(serviceName));