From 03345fd923c79ba2ca749076ce9a8dc38a496594 Mon Sep 17 00:00:00 2001
From: Liu Yixiao <72334292+beet233@users.noreply.github.com>
Date: Tue, 25 Oct 2022 11:31:29 +0800
Subject: [PATCH] [ISSUE #8461] Enhance Nacos monitor observability system
(#9038)
* Enhance MetricsMonitor with several new metrics
* Add config read and write rt
* Add ServiceEvent publisher queue size
* Add fuzzy search count
* Add naming subscriber and publisher count with v1 or v2 version tag
* Add config subscriber count with v1 or v2 version tag
* Add pending push task count and empty push count metrics
* collect pending push task count by scheduled task
* add unit test for increment metrics in NacosMonitorPushResultHook
* Add topn counter container and topn config change
* Create private MeterRegistry for config change
* Add scheduled clear for config change count, add removeAll for TopnCounterMetricsContainer
* Add topn service change
* Reconstruct metrics monitor with NacosMeterRegistryCenter.
* divide globalRegistry to different modules' CompositeMeterRegistry
* Add dynamic meters refresh service for config and naming module
* fix unit test
* Add try catch when init NacosMeterRegistryCenter
---
.../utils/TopnCounterMetricsContainer.java | 217 ++++++++++++++++++
.../TopnCounterMetricsContainerTest.java | 109 +++++++++
.../server/aspect/RequestLogAspect.java | 41 +++-
.../server/controller/ConfigController.java | 2 +
.../ConfigDynamicMeterRefreshService.java | 64 ++++++
.../config/server/monitor/MemoryMonitor.java | 4 +
.../config/server/monitor/MetricsMonitor.java | 98 ++++++--
.../ConfigSubscriberMetricsCollector.java | 45 ++++
.../remote/ConfigChangeListenContext.java | 9 +
.../server/service/LongPollingService.java | 4 +
.../service/notify/AsyncNotifyService.java | 3 +
.../nacos/core/monitor/MetricsMonitor.java | 34 ++-
.../core/monitor/NacosMeterRegistry.java | 57 -----
.../monitor/NacosMeterRegistryCenter.java | 165 +++++++++++++
.../core/monitor/MetricsMonitorTest.java | 26 ++-
.../nacos/naming/monitor/MetricsMonitor.java | 94 +++++++-
.../NamingDynamicMeterRefreshService.java | 64 ++++++
.../NamingSubAndPubMetricsCollector.java | 83 +++++++
.../PushPendingTaskCountMetricsCollector.java | 50 ++++
...ServiceEventQueueSizeMetricsCollector.java | 52 +++++
.../v2/NamingSubscriberServiceV2Impl.java | 6 +
.../v2/hook/NacosMonitorPushResultHook.java | 3 +
.../naming/monitor/MetricsMonitorTest.java | 19 ++
.../hook/NacosMonitorPushResultHookTest.java | 102 ++++++++
24 files changed, 1249 insertions(+), 102 deletions(-)
create mode 100644 common/src/main/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainer.java
create mode 100644 common/src/test/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainerTest.java
create mode 100644 config/src/main/java/com/alibaba/nacos/config/server/monitor/ConfigDynamicMeterRefreshService.java
create mode 100644 config/src/main/java/com/alibaba/nacos/config/server/monitor/collector/ConfigSubscriberMetricsCollector.java
delete mode 100644 core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistry.java
create mode 100644 core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistryCenter.java
create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/monitor/NamingDynamicMeterRefreshService.java
create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/NamingSubAndPubMetricsCollector.java
create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/PushPendingTaskCountMetricsCollector.java
create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/ServiceEventQueueSizeMetricsCollector.java
create mode 100644 naming/src/test/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHookTest.java
diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainer.java b/common/src/main/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainer.java
new file mode 100644
index 000000000..2f0b47617
--- /dev/null
+++ b/common/src/main/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainer.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.common.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * container for top n counter metrics, increment and remove cost O(1) time.
+ *
+ * @author liuyixiao
+ */
+public class TopnCounterMetricsContainer {
+
+ /**
+ * dataId -> count.
+ */
+ private ConcurrentHashMap dataCount;
+
+ /**
+ * count -> node.
+ */
+ private ConcurrentHashMap specifiedCountDataIdSets;
+
+ private DoublyLinkedNode dummyHead;
+
+ public TopnCounterMetricsContainer() {
+ dataCount = new ConcurrentHashMap<>();
+ specifiedCountDataIdSets = new ConcurrentHashMap<>();
+ dummyHead = new DoublyLinkedNode(null, null, null, -1);
+ dummyHead.next = new DoublyLinkedNode(null, dummyHead, new ConcurrentHashSet<>(), 0);
+ specifiedCountDataIdSets.put(0, dummyHead.next);
+ }
+
+ public List> getTopNCounter(int n) {
+ List> topnCounter = new LinkedList<>();
+ DoublyLinkedNode curr = dummyHead;
+ while (curr.next != null && topnCounter.size() < n) {
+ for (String dataId : curr.next.dataSet) {
+ // use inner AtomicInteger to reflect change to prometheus
+ topnCounter.add(new Pair<>(dataId, dataCount.get(dataId)));
+ if (topnCounter.size() == n) {
+ break;
+ }
+ }
+ curr = curr.next;
+ }
+ return topnCounter;
+ }
+
+ /**
+ * put(String dataId, 0).
+ *
+ * @param dataId data name or data key.
+ */
+ public void put(String dataId) {
+ put(dataId, 0);
+ }
+
+ /**
+ * put new data into container, if already exist, update it.
+ * this method could be slow (O(N)), most time use increment.
+ *
+ * @param dataId data name or data key.
+ * @param count data count.
+ */
+ public void put(String dataId, int count) {
+ if (dataCount.containsKey(dataId)) {
+ removeFromSpecifiedCountDataIdSets(dataId);
+ dataCount.get(dataId).set(count);
+ } else {
+ dataCount.put(dataId, new AtomicInteger(count));
+ }
+ insertIntoSpecifiedCountDataIdSets(dataId, count);
+ }
+
+ /**
+ * get data count by dataId.
+ *
+ * @param dataId data name or data key.
+ * @return data count or -1 if not exist.
+ */
+ public int get(String dataId) {
+ if (dataCount.containsKey(dataId)) {
+ return dataCount.get(dataId).get();
+ }
+ return -1;
+ }
+
+ /**
+ * increment the count of dataId.
+ *
+ * @param dataId data name or data key.
+ */
+ public void increment(String dataId) {
+ if (!dataCount.containsKey(dataId)) {
+ put(dataId);
+ }
+ DoublyLinkedNode prev = removeFromSpecifiedCountDataIdSets(dataId);
+ int newCount = dataCount.get(dataId).incrementAndGet();
+ if (!isDummyHead(prev) && prev.count == newCount) {
+ insertIntoSpecifiedCountDataIdSets(dataId, prev);
+ } else {
+ // prev.count > newCount
+ DoublyLinkedNode newNode = new DoublyLinkedNode(prev.next, prev, new ConcurrentHashSet<>(), newCount);
+ if (prev.next != null) {
+ prev.next.prev = newNode;
+ }
+ prev.next = newNode;
+ newNode.dataSet.add(dataId);
+ specifiedCountDataIdSets.put(newCount, newNode);
+ }
+ }
+
+ /**
+ * remove data.
+ *
+ * @param dataId data name or data key.
+ * @return data count or null if data is not exist.
+ */
+ public AtomicInteger remove(String dataId) {
+ if (dataCount.containsKey(dataId)) {
+ removeFromSpecifiedCountDataIdSets(dataId);
+ return dataCount.remove(dataId);
+ }
+ return null;
+ }
+
+ /**
+ * remove all data.
+ */
+ public void removeAll() {
+ for (String dataId : dataCount.keySet()) {
+ removeFromSpecifiedCountDataIdSets(dataId);
+ }
+ dataCount.clear();
+ }
+
+ private DoublyLinkedNode removeFromSpecifiedCountDataIdSets(String dataId) {
+ int count = dataCount.get(dataId).get();
+ DoublyLinkedNode node = specifiedCountDataIdSets.get(count);
+ node.dataSet.remove(dataId);
+ // keep the 0 count node.
+ if (node.dataSet.size() == 0 && node.count != 0) {
+ node.prev.next = node.next;
+ if (node.next != null) {
+ node.next.prev = node.prev;
+ }
+ specifiedCountDataIdSets.remove(node.count);
+ }
+ return node.prev;
+ }
+
+ private void insertIntoSpecifiedCountDataIdSets(String dataId, int count) {
+ if (specifiedCountDataIdSets.containsKey(count)) {
+ specifiedCountDataIdSets.get(count).dataSet.add(dataId);
+ } else {
+ DoublyLinkedNode prev = dummyHead;
+ while (prev.next != null) {
+ if (prev.next.count < count) {
+ break;
+ } else {
+ prev = prev.next;
+ }
+ }
+ DoublyLinkedNode newNode = new DoublyLinkedNode(prev.next, prev, new ConcurrentHashSet<>(), count);
+ if (prev.next != null) {
+ prev.next.prev = newNode;
+ }
+ prev.next = newNode;
+ newNode.dataSet.add(dataId);
+ specifiedCountDataIdSets.put(count, newNode);
+ }
+ }
+
+ private void insertIntoSpecifiedCountDataIdSets(String dataId, DoublyLinkedNode targetSet) {
+ targetSet.dataSet.add(dataId);
+ }
+
+ private boolean isDummyHead(DoublyLinkedNode node) {
+ return node.count == -1;
+ }
+
+ private class DoublyLinkedNode {
+
+ public DoublyLinkedNode next;
+
+ public DoublyLinkedNode prev;
+
+ public ConcurrentHashSet dataSet;
+
+ public int count;
+
+ public DoublyLinkedNode(DoublyLinkedNode next, DoublyLinkedNode prev, ConcurrentHashSet dataSet, int count) {
+ this.next = next;
+ this.prev = prev;
+ this.dataSet = dataSet;
+ this.count = count;
+ }
+ }
+}
diff --git a/common/src/test/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainerTest.java b/common/src/test/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainerTest.java
new file mode 100644
index 000000000..b2d867b4d
--- /dev/null
+++ b/common/src/test/java/com/alibaba/nacos/common/utils/TopnCounterMetricsContainerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.common.utils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * unit test for TopNCounterMetricsContainer.
+ *
+ * @author liuyixiao
+ */
+public class TopnCounterMetricsContainerTest {
+
+ private TopnCounterMetricsContainer topnCounterMetricsContainer;
+
+ @Before
+ public void setUp() {
+ topnCounterMetricsContainer = new TopnCounterMetricsContainer();
+ }
+
+ @Test
+ public void testPut() {
+ topnCounterMetricsContainer.put("test");
+ Assert.assertEquals(0, topnCounterMetricsContainer.get("test"));
+ topnCounterMetricsContainer.put("test1", 1);
+ Assert.assertEquals(1, topnCounterMetricsContainer.get("test1"));
+ }
+
+ @Test
+ public void testIncrement() {
+ topnCounterMetricsContainer.put("test", 0);
+ topnCounterMetricsContainer.increment("test");
+ Assert.assertEquals(1, topnCounterMetricsContainer.get("test"));
+ }
+
+ @Test
+ public void testRemove() {
+ topnCounterMetricsContainer.put("test");
+ Assert.assertEquals(0, topnCounterMetricsContainer.get("test"));
+ topnCounterMetricsContainer.remove("test");
+ Assert.assertEquals(-1, topnCounterMetricsContainer.get("test"));
+ }
+
+ @Test
+ public void testRemoveAll() {
+ topnCounterMetricsContainer.put("test");
+ topnCounterMetricsContainer.put("test1");
+ topnCounterMetricsContainer.put("test2");
+ topnCounterMetricsContainer.removeAll();
+ Assert.assertEquals(-1, topnCounterMetricsContainer.get("test"));
+ Assert.assertEquals(-1, topnCounterMetricsContainer.get("test1"));
+ Assert.assertEquals(-1, topnCounterMetricsContainer.get("test2"));
+ }
+
+ @Test
+ public void testGetTopNCounterAndRemoveAll() {
+ final int N = 10;
+ String dataIds = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ List> dataList = new ArrayList<>();
+ for (int i = 0; i < dataIds.length(); i++) {
+ topnCounterMetricsContainer.put(dataIds.substring(i, i + 1));
+ dataList.add(new Pair<>(dataIds.substring(i, i + 1), new AtomicInteger()));
+ }
+ Random random = new Random();
+ for (int i = 0; i < 10000; i++) {
+ int j = random.nextInt(dataIds.length());
+ topnCounterMetricsContainer.increment(dataIds.substring(j, j + 1));
+ dataList.get(j).getSecond().incrementAndGet();
+ }
+ boolean right = true;
+ Collections.sort(dataList, (a, b) -> b.getSecond().get() - a.getSecond().get());
+ List> result = topnCounterMetricsContainer.getTopNCounter(N);
+ for (Pair item : result) {
+ // ensure every top N count is greater than (N+1)th greatest.
+ if (item.getSecond().get() < dataList.get(N).getSecond().get()) {
+ right = false;
+ break;
+ }
+ }
+ Assert.assertTrue(right);
+ topnCounterMetricsContainer.removeAll();
+ for (int i = 0; i < dataIds.length(); i++) {
+ Assert.assertEquals(-1, topnCounterMetricsContainer.get(dataIds.substring(i, i + 1)));
+ }
+ Assert.assertEquals(0, topnCounterMetricsContainer.getTopNCounter(N).size());
+ }
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/aspect/RequestLogAspect.java b/config/src/main/java/com/alibaba/nacos/config/server/aspect/RequestLogAspect.java
index 73e6d7e80..67832d02b 100755
--- a/config/src/main/java/com/alibaba/nacos/config/server/aspect/RequestLogAspect.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/aspect/RequestLogAspect.java
@@ -37,6 +37,8 @@ import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* * Created with IntelliJ IDEA. User: dingjoey Date: 13-12-12 Time: 21:12 client api && sdk api 请求日志打点逻辑
@@ -110,8 +112,11 @@ public class RequestLogAspect {
final String md5 =
request.getContent() == null ? null : MD5Utils.md5Hex(request.getContent(), Constants.ENCODE);
MetricsMonitor.getPublishMonitor().incrementAndGet();
- return logClientRequestRpc("publish", pjp, request, meta, request.getDataId(), request.getGroup(),
- request.getTenant(), md5);
+ AtomicLong rtHolder = new AtomicLong();
+ Object retVal = logClientRequestRpc("publish", pjp, request, meta, request.getDataId(), request.getGroup(),
+ request.getTenant(), md5, rtHolder);
+ MetricsMonitor.getWriteConfigRpcRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
+ return retVal;
}
/**
@@ -122,7 +127,10 @@ public class RequestLogAspect {
HttpServletResponse response, String dataId, String group, String tenant, String content) throws Throwable {
final String md5 = content == null ? null : MD5Utils.md5Hex(content, Constants.ENCODE);
MetricsMonitor.getPublishMonitor().incrementAndGet();
- return logClientRequest("publish", pjp, request, response, dataId, group, tenant, md5);
+ AtomicLong rtHolder = new AtomicLong();
+ Object retVal = logClientRequest("publish", pjp, request, response, dataId, group, tenant, md5, rtHolder);
+ MetricsMonitor.getWriteConfigRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
+ return retVal;
}
/**
@@ -131,7 +139,7 @@ public class RequestLogAspect {
@Around(CLIENT_INTERFACE_REMOVE_ALL_CONFIG)
public Object interfaceRemoveAll(ProceedingJoinPoint pjp, HttpServletRequest request, HttpServletResponse response,
String dataId, String group, String tenant) throws Throwable {
- return logClientRequest("remove", pjp, request, response, dataId, group, tenant, null);
+ return logClientRequest("remove", pjp, request, response, dataId, group, tenant, null, null);
}
/**
@@ -141,7 +149,7 @@ public class RequestLogAspect {
public Object interfaceRemoveAllRpc(ProceedingJoinPoint pjp, ConfigRemoveRequest request, RequestMeta meta)
throws Throwable {
return logClientRequestRpc("remove", pjp, request, meta, request.getDataId(), request.getGroup(),
- request.getTenant(), null);
+ request.getTenant(), null, null);
}
/**
@@ -153,7 +161,10 @@ public class RequestLogAspect {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final String md5 = ConfigCacheService.getContentMd5(groupKey);
MetricsMonitor.getConfigMonitor().incrementAndGet();
- return logClientRequest("get", pjp, request, response, dataId, group, tenant, md5);
+ AtomicLong rtHolder = new AtomicLong();
+ Object retVal = logClientRequest("get", pjp, request, response, dataId, group, tenant, md5, rtHolder);
+ MetricsMonitor.getReadConfigRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
+ return retVal;
}
/**
@@ -165,20 +176,26 @@ public class RequestLogAspect {
final String groupKey = GroupKey2.getKey(request.getDataId(), request.getGroup(), request.getTenant());
final String md5 = ConfigCacheService.getContentMd5(groupKey);
MetricsMonitor.getConfigMonitor().incrementAndGet();
- return logClientRequestRpc("get", pjp, request, meta, request.getDataId(), request.getGroup(),
- request.getTenant(), md5);
+ AtomicLong rtHolder = new AtomicLong();
+ Object retVal = logClientRequestRpc("get", pjp, request, meta, request.getDataId(), request.getGroup(),
+ request.getTenant(), md5, rtHolder);
+ MetricsMonitor.getReadConfigRpcRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
+ return retVal;
}
/**
* Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5.
*/
private Object logClientRequest(String requestType, ProceedingJoinPoint pjp, HttpServletRequest request,
- HttpServletResponse response, String dataId, String group, String tenant, String md5) throws Throwable {
+ HttpServletResponse response, String dataId, String group, String tenant, String md5, AtomicLong rtHolder) throws Throwable {
final String requestIp = RequestUtil.getRemoteIp(request);
String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
final long st = System.currentTimeMillis();
Object retVal = pjp.proceed();
final long rt = System.currentTimeMillis() - st;
+ if (rtHolder != null) {
+ rtHolder.set(rt);
+ }
// rt | status | requestIp | opType | dataId | group | datumId | md5 |
// appName
LogUtil.CLIENT_LOG
@@ -191,12 +208,15 @@ public class RequestLogAspect {
* Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5.
*/
private Object logClientRequestRpc(String requestType, ProceedingJoinPoint pjp, Request request, RequestMeta meta,
- String dataId, String group, String tenant, String md5) throws Throwable {
+ String dataId, String group, String tenant, String md5, AtomicLong rtHolder) throws Throwable {
final String requestIp = meta.getClientIp();
String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
final long st = System.currentTimeMillis();
Response retVal = (Response) pjp.proceed();
final long rt = System.currentTimeMillis() - st;
+ if (rtHolder != null) {
+ rtHolder.set(rt);
+ }
// rt | status | requestIp | opType | dataId | group | datumId | md5 |
// appName
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}", rt,
@@ -224,5 +244,4 @@ public class RequestLogAspect {
request.isListen(), "", "", appName);
return retVal;
}
-
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
index d19e2170f..e238003b4 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
@@ -38,6 +38,7 @@ import com.alibaba.nacos.config.server.model.Page;
import com.alibaba.nacos.config.server.model.SameConfigPolicy;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
+import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.result.code.ResultCodeEnum;
import com.alibaba.nacos.config.server.service.AggrWhitelist;
import com.alibaba.nacos.config.server.service.ConfigChangePublisher;
@@ -412,6 +413,7 @@ public class ConfigController {
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam("pageNo") int pageNo, @RequestParam("pageSize") int pageSize) {
+ MetricsMonitor.getFuzzySearchMonitor().incrementAndGet();
Map configAdvanceInfo = new HashMap<>(50);
if (StringUtils.isNotBlank(appName)) {
configAdvanceInfo.put("appName", appName);
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/monitor/ConfigDynamicMeterRefreshService.java b/config/src/main/java/com/alibaba/nacos/config/server/monitor/ConfigDynamicMeterRefreshService.java
new file mode 100644
index 000000000..fcb0b7e3f
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/monitor/ConfigDynamicMeterRefreshService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.monitor;
+
+import com.alibaba.nacos.common.utils.Pair;
+import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
+import io.micrometer.core.instrument.ImmutableTag;
+import io.micrometer.core.instrument.Tag;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * dynamic meter refresh service.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class ConfigDynamicMeterRefreshService {
+
+ private static final String TOPN_CONFIG_CHANGE_REGISTRY = NacosMeterRegistryCenter.TOPN_CONFIG_CHANGE_REGISTRY;
+
+ private static final int CONFIG_CHANGE_N = 10;
+
+ /**
+ * refresh config change count top n per 30s.
+ */
+ @Scheduled(cron = "0/30 * * * * *")
+ public void refreshTopnConfigChangeCount() {
+ NacosMeterRegistryCenter.clear(TOPN_CONFIG_CHANGE_REGISTRY);
+ List> topnConfigChangeCount = MetricsMonitor.getConfigChangeCount()
+ .getTopNCounter(CONFIG_CHANGE_N);
+ for (Pair configChangeCount : topnConfigChangeCount) {
+ List tags = new ArrayList<>();
+ tags.add(new ImmutableTag("config", configChangeCount.getFirst()));
+ NacosMeterRegistryCenter.gauge(TOPN_CONFIG_CHANGE_REGISTRY, "config_change_count", tags, configChangeCount.getSecond());
+ }
+ }
+
+ /**
+ * reset config change count to 0 every week.
+ */
+ @Scheduled(cron = "0 0 0 ? * 1")
+ public void resetTopnConfigChangeCount() {
+ MetricsMonitor.getConfigChangeCount().removeAll();
+ }
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/monitor/MemoryMonitor.java b/config/src/main/java/com/alibaba/nacos/config/server/monitor/MemoryMonitor.java
index 4766d869f..b3360fe6d 100755
--- a/config/src/main/java/com/alibaba/nacos/config/server/monitor/MemoryMonitor.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/monitor/MemoryMonitor.java
@@ -48,9 +48,13 @@ public class MemoryMonitor {
private static final long DELAY_SECONDS = 10;
+ /**
+ * reset some metrics to 0 every day.
+ */
@Scheduled(cron = "0 0 0 * * ?")
public void clear() {
MetricsMonitor.getConfigMonitor().set(0);
MetricsMonitor.getPublishMonitor().set(0);
+ MetricsMonitor.getFuzzySearchMonitor().set(0);
}
}
\ No newline at end of file
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/monitor/MetricsMonitor.java b/config/src/main/java/com/alibaba/nacos/config/server/monitor/MetricsMonitor.java
index c12de68ec..9c0a08081 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/monitor/MetricsMonitor.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/monitor/MetricsMonitor.java
@@ -16,14 +16,16 @@
package com.alibaba.nacos.config.server.monitor;
+import com.alibaba.nacos.common.utils.TopnCounterMetricsContainer;
+import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.ImmutableTag;
-import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -33,19 +35,21 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class MetricsMonitor {
+ private static final String METER_REGISTRY = NacosMeterRegistryCenter.CONFIG_STABLE_REGISTRY;
+
private static AtomicInteger getConfig = new AtomicInteger();
private static AtomicInteger publish = new AtomicInteger();
/**
- * task for notify config change to sub client of http long polling..
+ * task for notify config change to sub client of http long polling.
*/
private static AtomicInteger longPolling = new AtomicInteger();
private static AtomicInteger configCount = new AtomicInteger();
/**
- * task for ntify config change to cluster server.
+ * task for notify config change to cluster server.
*/
private static AtomicInteger notifyTask = new AtomicInteger();
@@ -56,43 +60,74 @@ public class MetricsMonitor {
private static AtomicInteger dumpTask = new AtomicInteger();
+ /**
+ * config fuzzy search count.
+ */
+ private static AtomicInteger fuzzySearch = new AtomicInteger();
+
+ /**
+ * version -> client config subscriber count.
+ */
+ private static ConcurrentHashMap configSubscriber = new ConcurrentHashMap<>();
+
+ /**
+ * config change count.
+ */
+ private static TopnCounterMetricsContainer configChangeCount = new TopnCounterMetricsContainer();
+
static {
ImmutableTag immutableTag = new ImmutableTag("module", "config");
List tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "getConfig"));
- Metrics.gauge("nacos_monitor", tags, getConfig);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, getConfig);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "publish"));
- Metrics.gauge("nacos_monitor", tags, publish);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, publish);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "longPolling"));
- Metrics.gauge("nacos_monitor", tags, longPolling);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longPolling);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "configCount"));
- Metrics.gauge("nacos_monitor", tags, configCount);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, configCount);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "notifyTask"));
- Metrics.gauge("nacos_monitor", tags, notifyTask);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, notifyTask);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "notifyClientTask"));
- Metrics.gauge("nacos_monitor", tags, notifyClientTask);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, notifyClientTask);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "dumpTask"));
- Metrics.gauge("nacos_monitor", tags, dumpTask);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, dumpTask);
+
+ tags = new ArrayList<>();
+ tags.add(immutableTag);
+ tags.add(new ImmutableTag("name", "fuzzySearch"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, fuzzySearch);
+
+ configSubscriber.put("v1", new AtomicInteger(0));
+ configSubscriber.put("v2", new AtomicInteger(0));
+
+ tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v1"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_config_subscriber", tags, configSubscriber.get("v1"));
+
+ tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v2"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_config_subscriber", tags, configSubscriber.get("v2"));
}
public static AtomicInteger getConfigMonitor() {
@@ -123,28 +158,59 @@ public class MetricsMonitor {
return dumpTask;
}
+ public static AtomicInteger getFuzzySearchMonitor() {
+ return fuzzySearch;
+ }
+
+ public static AtomicInteger getConfigSubscriberMonitor(String version) {
+ return configSubscriber.get(version);
+ }
+
+ public static TopnCounterMetricsContainer getConfigChangeCount() {
+ return configChangeCount;
+ }
+
+ public static Timer getReadConfigRtTimer() {
+ return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "readConfigRt");
+ }
+
+ public static Timer getReadConfigRpcRtTimer() {
+ return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "readConfigRpcRt");
+ }
+
+ public static Timer getWriteConfigRtTimer() {
+ return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "writeConfigRt");
+ }
+
+ public static Timer getWriteConfigRpcRtTimer() {
+ return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "writeConfigRpcRt");
+ }
+
public static Timer getNotifyRtTimer() {
- return Metrics.timer("nacos_timer", "module", "config", "name", "notifyRt");
+ return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "notifyRt");
}
public static Counter getIllegalArgumentException() {
- return Metrics.counter("nacos_exception", "module", "config", "name", "illegalArgument");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "illegalArgument");
}
public static Counter getNacosException() {
- return Metrics.counter("nacos_exception", "module", "config", "name", "nacos");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "nacos");
}
public static Counter getDbException() {
- return Metrics.counter("nacos_exception", "module", "config", "name", "db");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "db");
}
public static Counter getConfigNotifyException() {
- return Metrics.counter("nacos_exception", "module", "config", "name", "configNotify");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "configNotify");
}
public static Counter getUnhealthException() {
- return Metrics.counter("nacos_exception", "module", "config", "name", "unhealth");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "unhealth");
}
+ public static void incrementConfigChangeCount(String tenant, String group, String dataId) {
+ configChangeCount.increment(tenant + "@" + group + "@" + dataId);
+ }
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/monitor/collector/ConfigSubscriberMetricsCollector.java b/config/src/main/java/com/alibaba/nacos/config/server/monitor/collector/ConfigSubscriberMetricsCollector.java
new file mode 100644
index 000000000..ff02401a6
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/monitor/collector/ConfigSubscriberMetricsCollector.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.monitor.collector;
+
+import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
+import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
+import com.alibaba.nacos.config.server.service.LongPollingService;
+import com.alibaba.nacos.config.server.utils.ConfigExecutor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * v1 and v2 config subscriber metrics collector.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class ConfigSubscriberMetricsCollector {
+
+ private static final long DELAY_SECONDS = 5;
+
+ @Autowired
+ public ConfigSubscriberMetricsCollector(LongPollingService longPollingService, ConfigChangeListenContext configChangeListenContext) {
+ ConfigExecutor.scheduleConfigTask(() -> {
+ MetricsMonitor.getConfigSubscriberMonitor("v1").set(longPollingService.getSubscriberCount());
+ MetricsMonitor.getConfigSubscriberMonitor("v2").set(configChangeListenContext.getConnectionCount());
+ }, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeListenContext.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeListenContext.java
index 037c794b7..027c85a58 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeListenContext.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeListenContext.java
@@ -173,4 +173,13 @@ public class ConfigChangeListenContext {
return groupKeyContexts == null ? null : groupKeyContexts.get(groupKey);
}
+ /**
+ * get connection count.
+ *
+ * @return count of long connections.
+ */
+ public int getConnectionCount() {
+ return connectionIdContext.size();
+ }
+
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
index b6c65f93a..359244687 100755
--- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
@@ -530,4 +530,8 @@ public class LongPollingService {
public void setRetainIps(Map retainIps) {
this.retainIps = retainIps;
}
+
+ public int getSubscriberCount() {
+ return allSubs.size();
+ }
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java
index 7c0eadf96..198327efb 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java
@@ -103,6 +103,9 @@ public class AsyncNotifyService {
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
+
+ MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
+
Collection ipList = memberManager.allMembers();
// In fact, any type of queue here can be
diff --git a/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java b/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java
index dce433015..747fca885 100644
--- a/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java
+++ b/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java
@@ -18,7 +18,6 @@ package com.alibaba.nacos.core.monitor;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.ImmutableTag;
-import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
@@ -33,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public final class MetricsMonitor {
+ private static final String METER_REGISTRY = NacosMeterRegistryCenter.CORE_STABLE_REGISTRY;
+
private static final DistributionSummary RAFT_READ_INDEX_FAILED;
private static final DistributionSummary RAFT_FROM_LEADER;
@@ -44,16 +45,31 @@ public final class MetricsMonitor {
private static AtomicInteger longConnection = new AtomicInteger();
static {
- RAFT_READ_INDEX_FAILED = NacosMeterRegistry.summary("protocol", "raft_read_index_failed");
- RAFT_FROM_LEADER = NacosMeterRegistry.summary("protocol", "raft_read_from_leader");
-
- RAFT_APPLY_LOG_TIMER = NacosMeterRegistry.timer("protocol", "raft_apply_log_timer");
- RAFT_APPLY_READ_TIMER = NacosMeterRegistry.timer("protocol", "raft_apply_read_timer");
-
+ ImmutableTag immutableTag = new ImmutableTag("module", "core");
List tags = new ArrayList<>();
- tags.add(new ImmutableTag("module", "config"));
+ tags.add(immutableTag);
+ tags.add(new ImmutableTag("name", "raft_read_index_failed"));
+ RAFT_READ_INDEX_FAILED = NacosMeterRegistryCenter.summary(METER_REGISTRY, "nacos_monitor", tags);
+
+ tags = new ArrayList<>();
+ tags.add(immutableTag);
+ tags.add(new ImmutableTag("name", "raft_read_from_leader"));
+ RAFT_FROM_LEADER = NacosMeterRegistryCenter.summary(METER_REGISTRY, "nacos_monitor", tags);
+
+ tags = new ArrayList<>();
+ tags.add(immutableTag);
+ tags.add(new ImmutableTag("name", "raft_apply_log_timer"));
+ RAFT_APPLY_LOG_TIMER = NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_monitor", tags);
+
+ tags = new ArrayList<>();
+ tags.add(immutableTag);
+ tags.add(new ImmutableTag("name", "raft_apply_read_timer"));
+ RAFT_APPLY_READ_TIMER = NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_monitor", tags);
+
+ tags = new ArrayList<>();
+ tags.add(immutableTag);
tags.add(new ImmutableTag("name", "longConnection"));
- Metrics.gauge("nacos_monitor", tags, longConnection);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longConnection);
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistry.java b/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistry.java
deleted file mode 100644
index c2686c67e..000000000
--- a/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistry.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 1999-2018 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.nacos.core.monitor;
-
-import io.micrometer.core.instrument.DistributionSummary;
-import io.micrometer.core.instrument.ImmutableTag;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.Timer;
-import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Metrics unified usage center.
- *
- * @author liaochuntao
- */
-@SuppressWarnings("all")
-public final class NacosMeterRegistry {
-
- private static final CompositeMeterRegistry METER_REGISTRY = new CompositeMeterRegistry();
-
- public static DistributionSummary summary(String module, String name) {
- ImmutableTag moduleTag = new ImmutableTag("module", module);
- List tags = new ArrayList<>();
- tags.add(moduleTag);
- tags.add(new ImmutableTag("name", name));
- return METER_REGISTRY.summary("nacos_monitor", tags);
- }
-
- public static Timer timer(String module, String name) {
- ImmutableTag moduleTag = new ImmutableTag("module", module);
- List tags = new ArrayList<>();
- tags.add(moduleTag);
- tags.add(new ImmutableTag("name", name));
- return METER_REGISTRY.timer("nacos_monitor", tags);
- }
-
- public static CompositeMeterRegistry getMeterRegistry() {
- return METER_REGISTRY;
- }
-}
diff --git a/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistryCenter.java b/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistryCenter.java
new file mode 100644
index 000000000..d0c5c4dd4
--- /dev/null
+++ b/core/src/main/java/com/alibaba/nacos/core/monitor/NacosMeterRegistryCenter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.core.monitor;
+
+import com.alibaba.nacos.core.utils.Loggers;
+import com.alibaba.nacos.sys.utils.ApplicationUtils;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Metrics unified usage center.
+ *
+ * @author liuyixiao
+ */
+@SuppressWarnings("all")
+public final class NacosMeterRegistryCenter {
+
+ // stable registries.
+
+ public static final String CORE_STABLE_REGISTRY = "CORE_STABLE_REGISTRY";
+
+ public static final String CONFIG_STABLE_REGISTRY = "CONFIG_STABLE_REGISTRY";
+
+ public static final String NAMING_STABLE_REGISTRY = "NAMING_STABLE_REGISTRY";
+
+ // dynamic registries.
+
+ public static final String TOPN_CONFIG_CHANGE_REGISTRY = "TOPN_CONFIG_CHANGE_REGISTRY";
+
+ public static final String TOPN_SERVICE_CHANGE_REGISTRY = "TOPN_SERVICE_CHANGE_REGISTRY";
+
+ private static final ConcurrentHashMap METER_REGISTRIES = new ConcurrentHashMap<>();
+
+ private static PrometheusMeterRegistry PROMETHEUS_METER_REGISTRY = null;
+
+ static {
+ try {
+ PROMETHEUS_METER_REGISTRY = ApplicationUtils.getBean(PrometheusMeterRegistry.class);
+ } catch (Throwable t) {
+ Loggers.CORE.warn("Metrics init failed :", t);
+ }
+
+ CompositeMeterRegistry compositeMeterRegistry;
+
+ compositeMeterRegistry = new CompositeMeterRegistry();
+ if (PROMETHEUS_METER_REGISTRY != null) {
+ compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
+ }
+ METER_REGISTRIES.put(CORE_STABLE_REGISTRY, compositeMeterRegistry);
+
+ compositeMeterRegistry = new CompositeMeterRegistry();
+ if (PROMETHEUS_METER_REGISTRY != null) {
+ compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
+ }
+ METER_REGISTRIES.put(CONFIG_STABLE_REGISTRY, compositeMeterRegistry);
+
+ compositeMeterRegistry = new CompositeMeterRegistry();
+ if (PROMETHEUS_METER_REGISTRY != null) {
+ compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
+ }
+ METER_REGISTRIES.put(NAMING_STABLE_REGISTRY, compositeMeterRegistry);
+
+ compositeMeterRegistry = new CompositeMeterRegistry();
+ if (PROMETHEUS_METER_REGISTRY != null) {
+ compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
+ }
+ METER_REGISTRIES.put(TOPN_CONFIG_CHANGE_REGISTRY, compositeMeterRegistry);
+
+ compositeMeterRegistry = new CompositeMeterRegistry();
+ if (PROMETHEUS_METER_REGISTRY != null) {
+ compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
+ }
+ METER_REGISTRIES.put(TOPN_SERVICE_CHANGE_REGISTRY, compositeMeterRegistry);
+ }
+
+ public static Counter counter(String registry, String name, Iterable tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).counter(name, tags);
+ }
+ return null;
+ }
+
+ public static Counter counter(String registry, String name, String... tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).counter(name, tags);
+ }
+ return null;
+ }
+
+ public static T gauge(String registry, String name, Iterable tags, T number) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).gauge(name, tags, number);
+ }
+ return null;
+ }
+
+ public static Timer timer(String registry, String name, Iterable tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).timer(name, tags);
+ }
+ return null;
+ }
+
+ public static Timer timer(String registry, String name, String... tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).timer(name, tags);
+ }
+ return null;
+ }
+
+ public static DistributionSummary summary(String registry, String name, Iterable tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).summary(name, tags);
+ }
+ return null;
+ }
+
+ public static DistributionSummary summary(String registry, String name, String... tags) {
+ CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
+ if (compositeMeterRegistry != null) {
+ return METER_REGISTRIES.get(registry).summary(name, tags);
+ }
+ return null;
+ }
+
+ public static void clear(String registry) {
+ METER_REGISTRIES.get(registry).clear();
+ }
+
+ /**
+ * Just for test. Don't register meter by getMeterRegistry.
+ *
+ * @param registry
+ * @return CompositeMeterRegistry in NacosMeterRegistryCenter.
+ */
+ public static CompositeMeterRegistry getMeterRegistry(String registry) {
+ return METER_REGISTRIES.get(registry);
+ }
+}
diff --git a/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java b/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java
index 01842c7fa..d217aa26d 100644
--- a/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java
+++ b/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java
@@ -16,26 +16,42 @@
package com.alibaba.nacos.core.monitor;
+import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.context.ConfigurableApplicationContext;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.mockito.Mockito.when;
+
/**
- * {@link MetricsMonitor} and {@link NacosMeterRegistry} unit tests.
+ * {@link MetricsMonitor} and {@link NacosMeterRegistryCenter} unit tests.
*
* @author chenglu
- * @date 2021-06-15 22:58
+ * @author liuyixiao
*/
+@RunWith(MockitoJUnitRunner.Silent.class)
public class MetricsMonitorTest {
+ @Mock
+ private ConfigurableApplicationContext context;
+
@Before
public void initMeterRegistry() {
- NacosMeterRegistry.getMeterRegistry().add(new SimpleMeterRegistry());
+ ApplicationUtils.injectContext(context);
+ when(context.getBean(PrometheusMeterRegistry.class)).thenReturn(null);
+ // add simple meterRegistry.
+ NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.CORE_STABLE_REGISTRY)
+ .add(new SimpleMeterRegistry());
}
@Test
@@ -63,7 +79,7 @@ public class MetricsMonitorTest {
raftApplyTimerLog.record(10, TimeUnit.SECONDS);
raftApplyTimerLog.record(20, TimeUnit.SECONDS);
Assert.assertEquals(0.5D, raftApplyTimerLog.totalTime(TimeUnit.MINUTES), 0.01);
-
+
Assert.assertEquals(30D, raftApplyTimerLog.totalTime(TimeUnit.SECONDS), 0.01);
}
@@ -73,7 +89,7 @@ public class MetricsMonitorTest {
raftApplyReadTimer.record(10, TimeUnit.SECONDS);
raftApplyReadTimer.record(20, TimeUnit.SECONDS);
Assert.assertEquals(0.5D, raftApplyReadTimer.totalTime(TimeUnit.MINUTES), 0.01);
-
+
Assert.assertEquals(30D, raftApplyReadTimer.totalTime(TimeUnit.SECONDS), 0.01);
}
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/monitor/MetricsMonitor.java b/naming/src/main/java/com/alibaba/nacos/naming/monitor/MetricsMonitor.java
index 31e39123a..ddd6e6121 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/monitor/MetricsMonitor.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/monitor/MetricsMonitor.java
@@ -18,15 +18,17 @@ package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.naming.core.v2.pojo.BatchInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
+import com.alibaba.nacos.common.utils.TopnCounterMetricsContainer;
+import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import com.alibaba.nacos.naming.misc.Loggers;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.ImmutableTag;
-import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class MetricsMonitor {
+ private static final String METER_REGISTRY = NacosMeterRegistryCenter.NAMING_STABLE_REGISTRY;
+
private static final MetricsMonitor INSTANCE = new MetricsMonitor();
private final AtomicInteger mysqlHealthCheck = new AtomicInteger();
@@ -65,6 +69,29 @@ public class MetricsMonitor {
private final AtomicInteger failedPush = new AtomicInteger();
+ private final AtomicInteger emptyPush = new AtomicInteger();
+
+ private final AtomicInteger serviceSubscribedEventQueueSize = new AtomicInteger();
+
+ private final AtomicInteger serviceChangedEventQueueSize = new AtomicInteger();
+
+ private final AtomicInteger pushPendingTaskCount = new AtomicInteger();
+
+ /**
+ * version -> naming subscriber count.
+ */
+ private final ConcurrentHashMap namingSubscriber = new ConcurrentHashMap<>();
+
+ /**
+ * version -> naming publisher count.
+ */
+ private final ConcurrentHashMap namingPublisher = new ConcurrentHashMap<>();
+
+ /**
+ * topn service change count.
+ */
+ private final TopnCounterMetricsContainer serviceChangeCount = new TopnCounterMetricsContainer();
+
private MetricsMonitor() {
for (Field each : MetricsMonitor.class.getDeclaredFields()) {
if (Number.class.isAssignableFrom(each.getType())) {
@@ -76,13 +103,35 @@ public class MetricsMonitor {
}
}
}
+
+ namingSubscriber.put("v1", new AtomicInteger(0));
+ namingSubscriber.put("v2", new AtomicInteger(0));
+
+ List tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v1"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_subscriber", tags, namingSubscriber.get("v1"));
+
+ tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v2"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_subscriber", tags, namingSubscriber.get("v2"));
+
+ namingPublisher.put("v1", new AtomicInteger(0));
+ namingPublisher.put("v2", new AtomicInteger(0));
+
+ tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v1"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_publisher", tags, namingPublisher.get("v1"));
+
+ tags = new ArrayList<>();
+ tags.add(new ImmutableTag("version", "v2"));
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_publisher", tags, namingPublisher.get("v2"));
}
private void registerToMetrics(String name, T number) {
List tags = new ArrayList<>();
tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", name));
- Metrics.gauge("nacos_monitor", tags, number);
+ NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, number);
}
public static AtomicInteger getMysqlHealthCheckMonitor() {
@@ -129,14 +178,42 @@ public class MetricsMonitor {
return INSTANCE.failedPush;
}
+ public static AtomicInteger getEmptyPushMonitor() {
+ return INSTANCE.emptyPush;
+ }
+
public static AtomicInteger getTotalPushCountForAvg() {
return INSTANCE.totalPushCountForAvg;
}
+ public static AtomicInteger getServiceSubscribedEventQueueSize() {
+ return INSTANCE.serviceSubscribedEventQueueSize;
+ }
+
+ public static AtomicInteger getServiceChangedEventQueueSize() {
+ return INSTANCE.serviceChangedEventQueueSize;
+ }
+
+ public static AtomicInteger getPushPendingTaskCount() {
+ return INSTANCE.pushPendingTaskCount;
+ }
+
public static AtomicLong getTotalPushCostForAvg() {
return INSTANCE.totalPushCostForAvg;
}
+ public static AtomicInteger getNamingSubscriber(String version) {
+ return INSTANCE.namingSubscriber.get(version);
+ }
+
+ public static AtomicInteger getNamingPublisher(String version) {
+ return INSTANCE.namingPublisher.get(version);
+ }
+
+ public static TopnCounterMetricsContainer getServiceChangeCount() {
+ return INSTANCE.serviceChangeCount;
+ }
+
public static void compareAndSetMaxPushCost(long newCost) {
INSTANCE.maxPushCost.getAndUpdate((prev) -> Math.max(newCost, prev));
}
@@ -154,6 +231,10 @@ public class MetricsMonitor {
INSTANCE.failedPush.incrementAndGet();
}
+ public static void incrementEmptyPush() {
+ INSTANCE.emptyPush.incrementAndGet();
+ }
+
public static void incrementInstanceCount() {
INSTANCE.ipCount.incrementAndGet();
}
@@ -170,12 +251,16 @@ public class MetricsMonitor {
INSTANCE.subscriberCount.decrementAndGet();
}
+ public static void incrementServiceChangeCount(String namespace, String group, String name) {
+ INSTANCE.serviceChangeCount.increment(namespace + "@" + group + "@" + name);
+ }
+
public static Counter getDiskException() {
- return Metrics.counter("nacos_exception", "module", "naming", "name", "disk");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "naming", "name", "disk");
}
public static Counter getLeaderSendBeatFailedException() {
- return Metrics.counter("nacos_exception", "module", "naming", "name", "leaderSendBeatFailed");
+ return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "naming", "name", "leaderSendBeatFailed");
}
/**
@@ -214,6 +299,7 @@ public class MetricsMonitor {
public static void resetPush() {
getTotalPushMonitor().set(0);
getFailedPushMonitor().set(0);
+ getEmptyPushMonitor().set(0);
getTotalPushCostForAvg().set(0);
getTotalPushCountForAvg().set(0);
getMaxPushCostMonitor().set(-1);
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/monitor/NamingDynamicMeterRefreshService.java b/naming/src/main/java/com/alibaba/nacos/naming/monitor/NamingDynamicMeterRefreshService.java
new file mode 100644
index 000000000..8505fc083
--- /dev/null
+++ b/naming/src/main/java/com/alibaba/nacos/naming/monitor/NamingDynamicMeterRefreshService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.naming.monitor;
+
+import com.alibaba.nacos.common.utils.Pair;
+import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
+import io.micrometer.core.instrument.ImmutableTag;
+import io.micrometer.core.instrument.Tag;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * dynamic meter refresh service.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class NamingDynamicMeterRefreshService {
+
+ private static final String TOPN_SERVICE_CHANGE_REGISTRY = NacosMeterRegistryCenter.TOPN_SERVICE_CHANGE_REGISTRY;
+
+ private static final int SERVICE_CHANGE_N = 10;
+
+ /**
+ * refresh service change count top n per 30s.
+ */
+ @Scheduled(cron = "0/30 * * * * *")
+ public void refreshTopnConfigChangeCount() {
+ NacosMeterRegistryCenter.clear(TOPN_SERVICE_CHANGE_REGISTRY);
+ List> topnServiceChangeCount = MetricsMonitor.getServiceChangeCount()
+ .getTopNCounter(SERVICE_CHANGE_N);
+ for (Pair serviceChangeCount : topnServiceChangeCount) {
+ List tags = new ArrayList<>();
+ tags.add(new ImmutableTag("service", serviceChangeCount.getFirst()));
+ NacosMeterRegistryCenter.gauge(TOPN_SERVICE_CHANGE_REGISTRY, "service_change_count", tags, serviceChangeCount.getSecond());
+ }
+ }
+
+ /**
+ * reset service change count to 0 every week.
+ */
+ @Scheduled(cron = "0 0 0 ? * 1")
+ public void resetTopnServiceChangeCount() {
+ MetricsMonitor.getServiceChangeCount().removeAll();
+ }
+}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/NamingSubAndPubMetricsCollector.java b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/NamingSubAndPubMetricsCollector.java
new file mode 100644
index 000000000..9ba2ba073
--- /dev/null
+++ b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/NamingSubAndPubMetricsCollector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.naming.monitor.collector;
+
+import com.alibaba.nacos.common.executor.ExecutorFactory;
+import com.alibaba.nacos.naming.core.v2.client.Client;
+import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
+import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
+import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager;
+import com.alibaba.nacos.naming.monitor.MetricsMonitor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * v1 and v2 naming subscriber and publisher metrics collector.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class NamingSubAndPubMetricsCollector {
+
+ private static final long DELAY_SECONDS = 5;
+
+ private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
+ Thread thread = new Thread(r, "nacos.naming.monitor.NamingSubAndPubMetricsCollector");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ @Autowired
+ public NamingSubAndPubMetricsCollector(ConnectionBasedClientManager connectionBasedClientManager,
+ EphemeralIpPortClientManager ephemeralIpPortClientManager, PersistentIpPortClientManager persistentIpPortClientManager) {
+ executorService.scheduleWithFixedDelay(() -> {
+ int v1SubscriberCount = 0;
+ int v1PublisherCount = 0;
+ for (String clientId : ephemeralIpPortClientManager.allClientId()) {
+ Client client = ephemeralIpPortClientManager.getClient(clientId);
+ if (null != client) {
+ v1PublisherCount += client.getAllPublishedService().size();
+ v1SubscriberCount += client.getAllSubscribeService().size();
+ }
+ }
+ for (String clientId : persistentIpPortClientManager.allClientId()) {
+ Client client = persistentIpPortClientManager.getClient(clientId);
+ if (null != client) {
+ v1PublisherCount += client.getAllPublishedService().size();
+ v1SubscriberCount += client.getAllSubscribeService().size();
+ }
+ }
+ MetricsMonitor.getNamingSubscriber("v1").set(v1SubscriberCount);
+ MetricsMonitor.getNamingPublisher("v1").set(v1PublisherCount);
+
+ int v2SubscriberCount = 0;
+ int v2PublisherCount = 0;
+ for (String clientId : connectionBasedClientManager.allClientId()) {
+ Client client = connectionBasedClientManager.getClient(clientId);
+ if (null != client) {
+ v2PublisherCount += client.getAllPublishedService().size();
+ v2SubscriberCount += client.getAllSubscribeService().size();
+ }
+ }
+ MetricsMonitor.getNamingSubscriber("v2").set(v2SubscriberCount);
+ MetricsMonitor.getNamingPublisher("v2").set(v2PublisherCount);
+ }, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/PushPendingTaskCountMetricsCollector.java b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/PushPendingTaskCountMetricsCollector.java
new file mode 100644
index 000000000..f23b6e6fc
--- /dev/null
+++ b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/PushPendingTaskCountMetricsCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.naming.monitor.collector;
+
+import com.alibaba.nacos.common.executor.ExecutorFactory;
+import com.alibaba.nacos.naming.monitor.MetricsMonitor;
+import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * pending push task metrics collector.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class PushPendingTaskCountMetricsCollector {
+
+ private static final long DELAY_SECONDS = 2;
+
+ private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
+ Thread thread = new Thread(r, "nacos.naming.monitor.PushPendingTaskCountMetricsCollector");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ @Autowired
+ public PushPendingTaskCountMetricsCollector(NamingSubscriberServiceV2Impl namingSubscriberServiceV2) {
+ executorService.scheduleWithFixedDelay(() -> {
+ MetricsMonitor.getPushPendingTaskCount().set(namingSubscriberServiceV2.getPushPendingTaskCount());
+ }, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/ServiceEventQueueSizeMetricsCollector.java b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/ServiceEventQueueSizeMetricsCollector.java
new file mode 100644
index 000000000..fe387de60
--- /dev/null
+++ b/naming/src/main/java/com/alibaba/nacos/naming/monitor/collector/ServiceEventQueueSizeMetricsCollector.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.naming.monitor.collector;
+
+import com.alibaba.nacos.common.executor.ExecutorFactory;
+import com.alibaba.nacos.common.notify.NotifyCenter;
+import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
+import com.alibaba.nacos.naming.monitor.MetricsMonitor;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ServiceEvent queue size metrics collector.
+ *
+ * @author liuyixiao
+ */
+@Service
+public class ServiceEventQueueSizeMetricsCollector {
+
+ private static final long DELAY_SECONDS = 2;
+
+ private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
+ Thread thread = new Thread(r, "nacos.naming.monitor.ServiceEventQueueSizeMetricsCollector");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ public ServiceEventQueueSizeMetricsCollector() {
+ executorService.scheduleWithFixedDelay(() -> {
+ MetricsMonitor.getServiceSubscribedEventQueueSize().set(
+ (int) NotifyCenter.getPublisher(ServiceEvent.ServiceSubscribedEvent.class).currentEventSize());
+ MetricsMonitor.getServiceChangedEventQueueSize().set(
+ (int) NotifyCenter.getPublisher(ServiceEvent.ServiceChangedEvent.class).currentEventSize());
+ }, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java
index 693f6cf22..0841b4197 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/NamingSubscriberServiceV2Impl.java
@@ -30,6 +30,7 @@ import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.SwitchDomain;
+import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.NamingSubscriberService;
import com.alibaba.nacos.naming.push.v2.executor.PushExecutorDelegate;
@@ -125,6 +126,7 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
+ MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
@@ -138,4 +140,8 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
Collection services = indexesManager.getSubscribedService();
return services.size() > PARALLEL_SIZE ? services.parallelStream() : services.stream();
}
+
+ public int getPushPendingTaskCount() {
+ return delayTaskEngine.size();
+ }
}
diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHook.java b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHook.java
index 5ab5f6cb8..b7a328350 100644
--- a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHook.java
+++ b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHook.java
@@ -32,6 +32,9 @@ public class NacosMonitorPushResultHook implements PushResultHook {
MetricsMonitor.incrementPush();
MetricsMonitor.incrementPushCost(result.getAllCost());
MetricsMonitor.compareAndSetMaxPushCost(result.getAllCost());
+ if (null == result.getData().getHosts() || !result.getData().validate()) {
+ MetricsMonitor.incrementEmptyPush();
+ }
if (isRpc(result.getSubscriber())) {
NamingTpsMonitor.rpcPushSuccess(result.getSubscribeClientId(), result.getSubscriber().getIp());
} else {
diff --git a/naming/src/test/java/com/alibaba/nacos/naming/monitor/MetricsMonitorTest.java b/naming/src/test/java/com/alibaba/nacos/naming/monitor/MetricsMonitorTest.java
index 06d94b490..ef923aee7 100644
--- a/naming/src/test/java/com/alibaba/nacos/naming/monitor/MetricsMonitorTest.java
+++ b/naming/src/test/java/com/alibaba/nacos/naming/monitor/MetricsMonitorTest.java
@@ -16,15 +16,34 @@
package com.alibaba.nacos.naming.monitor;
+import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
+import com.alibaba.nacos.sys.utils.ApplicationUtils;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.context.ConfigurableApplicationContext;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.Silent.class)
public class MetricsMonitorTest {
+ @Mock
+ private ConfigurableApplicationContext context;
+
@Before
public void setUp() {
+ ApplicationUtils.injectContext(context);
+ when(context.getBean(PrometheusMeterRegistry.class)).thenReturn(null);
+ // add simple meterRegistry.
+ NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.NAMING_STABLE_REGISTRY)
+ .add(new SimpleMeterRegistry());
+
MetricsMonitor.resetPush();
}
diff --git a/naming/src/test/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHookTest.java b/naming/src/test/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHookTest.java
new file mode 100644
index 000000000..8c4082eb9
--- /dev/null
+++ b/naming/src/test/java/com/alibaba/nacos/naming/push/v2/hook/NacosMonitorPushResultHookTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 1999-2022 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.naming.push.v2.hook;
+
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
+import com.alibaba.nacos.core.remote.control.TpsMonitorManager;
+import com.alibaba.nacos.naming.monitor.MetricsMonitor;
+import com.alibaba.nacos.naming.pojo.Subscriber;
+import com.alibaba.nacos.sys.utils.ApplicationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.context.ConfigurableApplicationContext;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+/**
+ * test for NacosMonitorPushResultHook.
+ *
+ * @author liuyixiao
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class NacosMonitorPushResultHookTest {
+
+ @Mock
+ private PushResult pushResult;
+
+ @Mock
+ private Subscriber subscriber;
+
+ @Mock
+ private TpsMonitorManager tpsMonitorManager;
+
+ @Mock
+ private ConfigurableApplicationContext context;
+
+ @Mock
+ private Instance instance;
+
+ private final ServiceInfo serviceInfo = new ServiceInfo("name", "cluster");
+
+ private final long allCost = 100L;
+
+ @Before
+ public void setUp() {
+ MetricsMonitor.resetAll();
+ serviceInfo.setHosts(new ArrayList<>());
+ subscriber.setIp("0.0.0.0");
+ when(instance.getWeight()).thenReturn(1.0);
+ when(instance.isHealthy()).thenReturn(true);
+ when(pushResult.getAllCost()).thenReturn(allCost);
+ when(pushResult.getData()).thenReturn(serviceInfo);
+ when(pushResult.getSubscriber()).thenReturn(subscriber);
+ ApplicationUtils.injectContext(context);
+ when(context.getBean(TpsMonitorManager.class)).thenReturn(tpsMonitorManager);
+ }
+
+ @Test
+ public void testPushSuccessForEmptyPush() {
+ new NacosMonitorPushResultHook().pushSuccess(pushResult);
+ assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
+ assertEquals(1, MetricsMonitor.getEmptyPushMonitor().get());
+ assertEquals(allCost, MetricsMonitor.getMaxPushCostMonitor().get());
+ }
+
+ @Test
+ public void testPushSuccessForNoEmptyPush() {
+ ArrayList hosts = new ArrayList<>();
+ hosts.add(instance);
+ serviceInfo.setHosts(hosts);
+ new NacosMonitorPushResultHook().pushSuccess(pushResult);
+ assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
+ assertEquals(0, MetricsMonitor.getEmptyPushMonitor().get());
+ assertEquals(allCost, MetricsMonitor.getMaxPushCostMonitor().get());
+ }
+
+ @Test
+ public void testPushFailed() {
+ new NacosMonitorPushResultHook().pushFailed(pushResult);
+ assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
+ }
+}