Merge Asoc2022#8461 into 2.2.0
This commit is contained in:
杨翊 SionYang 2022-10-25 15:45:50 +08:00 committed by GitHub
commit a0816b5534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1249 additions and 102 deletions

View File

@ -0,0 +1,217 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.utils;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* container for top n counter metrics, increment and remove cost O(1) time.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
public class TopnCounterMetricsContainer {
/**
* dataId -> count.
*/
private ConcurrentHashMap<String, AtomicInteger> dataCount;
/**
* count -> node.
*/
private ConcurrentHashMap<Integer, DoublyLinkedNode> specifiedCountDataIdSets;
private DoublyLinkedNode dummyHead;
public TopnCounterMetricsContainer() {
dataCount = new ConcurrentHashMap<>();
specifiedCountDataIdSets = new ConcurrentHashMap<>();
dummyHead = new DoublyLinkedNode(null, null, null, -1);
dummyHead.next = new DoublyLinkedNode(null, dummyHead, new ConcurrentHashSet<>(), 0);
specifiedCountDataIdSets.put(0, dummyHead.next);
}
public List<Pair<String, AtomicInteger>> getTopNCounter(int n) {
List<Pair<String, AtomicInteger>> topnCounter = new LinkedList<>();
DoublyLinkedNode curr = dummyHead;
while (curr.next != null && topnCounter.size() < n) {
for (String dataId : curr.next.dataSet) {
// use inner AtomicInteger to reflect change to prometheus
topnCounter.add(new Pair<>(dataId, dataCount.get(dataId)));
if (topnCounter.size() == n) {
break;
}
}
curr = curr.next;
}
return topnCounter;
}
/**
* put(String dataId, 0).
*
* @param dataId data name or data key.
*/
public void put(String dataId) {
put(dataId, 0);
}
/**
* put new data into container, if already exist, update it.
* this method could be slow (O(N)), most time use increment.
*
* @param dataId data name or data key.
* @param count data count.
*/
public void put(String dataId, int count) {
if (dataCount.containsKey(dataId)) {
removeFromSpecifiedCountDataIdSets(dataId);
dataCount.get(dataId).set(count);
} else {
dataCount.put(dataId, new AtomicInteger(count));
}
insertIntoSpecifiedCountDataIdSets(dataId, count);
}
/**
* get data count by dataId.
*
* @param dataId data name or data key.
* @return data count or -1 if not exist.
*/
public int get(String dataId) {
if (dataCount.containsKey(dataId)) {
return dataCount.get(dataId).get();
}
return -1;
}
/**
* increment the count of dataId.
*
* @param dataId data name or data key.
*/
public void increment(String dataId) {
if (!dataCount.containsKey(dataId)) {
put(dataId);
}
DoublyLinkedNode prev = removeFromSpecifiedCountDataIdSets(dataId);
int newCount = dataCount.get(dataId).incrementAndGet();
if (!isDummyHead(prev) && prev.count == newCount) {
insertIntoSpecifiedCountDataIdSets(dataId, prev);
} else {
// prev.count > newCount
DoublyLinkedNode newNode = new DoublyLinkedNode(prev.next, prev, new ConcurrentHashSet<>(), newCount);
if (prev.next != null) {
prev.next.prev = newNode;
}
prev.next = newNode;
newNode.dataSet.add(dataId);
specifiedCountDataIdSets.put(newCount, newNode);
}
}
/**
* remove data.
*
* @param dataId data name or data key.
* @return data count or null if data is not exist.
*/
public AtomicInteger remove(String dataId) {
if (dataCount.containsKey(dataId)) {
removeFromSpecifiedCountDataIdSets(dataId);
return dataCount.remove(dataId);
}
return null;
}
/**
* remove all data.
*/
public void removeAll() {
for (String dataId : dataCount.keySet()) {
removeFromSpecifiedCountDataIdSets(dataId);
}
dataCount.clear();
}
private DoublyLinkedNode removeFromSpecifiedCountDataIdSets(String dataId) {
int count = dataCount.get(dataId).get();
DoublyLinkedNode node = specifiedCountDataIdSets.get(count);
node.dataSet.remove(dataId);
// keep the 0 count node.
if (node.dataSet.size() == 0 && node.count != 0) {
node.prev.next = node.next;
if (node.next != null) {
node.next.prev = node.prev;
}
specifiedCountDataIdSets.remove(node.count);
}
return node.prev;
}
private void insertIntoSpecifiedCountDataIdSets(String dataId, int count) {
if (specifiedCountDataIdSets.containsKey(count)) {
specifiedCountDataIdSets.get(count).dataSet.add(dataId);
} else {
DoublyLinkedNode prev = dummyHead;
while (prev.next != null) {
if (prev.next.count < count) {
break;
} else {
prev = prev.next;
}
}
DoublyLinkedNode newNode = new DoublyLinkedNode(prev.next, prev, new ConcurrentHashSet<>(), count);
if (prev.next != null) {
prev.next.prev = newNode;
}
prev.next = newNode;
newNode.dataSet.add(dataId);
specifiedCountDataIdSets.put(count, newNode);
}
}
private void insertIntoSpecifiedCountDataIdSets(String dataId, DoublyLinkedNode targetSet) {
targetSet.dataSet.add(dataId);
}
private boolean isDummyHead(DoublyLinkedNode node) {
return node.count == -1;
}
private class DoublyLinkedNode {
public DoublyLinkedNode next;
public DoublyLinkedNode prev;
public ConcurrentHashSet<String> dataSet;
public int count;
public DoublyLinkedNode(DoublyLinkedNode next, DoublyLinkedNode prev, ConcurrentHashSet<String> dataSet, int count) {
this.next = next;
this.prev = prev;
this.dataSet = dataSet;
this.count = count;
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* unit test for TopNCounterMetricsContainer.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
public class TopnCounterMetricsContainerTest {
private TopnCounterMetricsContainer topnCounterMetricsContainer;
@Before
public void setUp() {
topnCounterMetricsContainer = new TopnCounterMetricsContainer();
}
@Test
public void testPut() {
topnCounterMetricsContainer.put("test");
Assert.assertEquals(0, topnCounterMetricsContainer.get("test"));
topnCounterMetricsContainer.put("test1", 1);
Assert.assertEquals(1, topnCounterMetricsContainer.get("test1"));
}
@Test
public void testIncrement() {
topnCounterMetricsContainer.put("test", 0);
topnCounterMetricsContainer.increment("test");
Assert.assertEquals(1, topnCounterMetricsContainer.get("test"));
}
@Test
public void testRemove() {
topnCounterMetricsContainer.put("test");
Assert.assertEquals(0, topnCounterMetricsContainer.get("test"));
topnCounterMetricsContainer.remove("test");
Assert.assertEquals(-1, topnCounterMetricsContainer.get("test"));
}
@Test
public void testRemoveAll() {
topnCounterMetricsContainer.put("test");
topnCounterMetricsContainer.put("test1");
topnCounterMetricsContainer.put("test2");
topnCounterMetricsContainer.removeAll();
Assert.assertEquals(-1, topnCounterMetricsContainer.get("test"));
Assert.assertEquals(-1, topnCounterMetricsContainer.get("test1"));
Assert.assertEquals(-1, topnCounterMetricsContainer.get("test2"));
}
@Test
public void testGetTopNCounterAndRemoveAll() {
final int N = 10;
String dataIds = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
List<Pair<String, AtomicInteger>> dataList = new ArrayList<>();
for (int i = 0; i < dataIds.length(); i++) {
topnCounterMetricsContainer.put(dataIds.substring(i, i + 1));
dataList.add(new Pair<>(dataIds.substring(i, i + 1), new AtomicInteger()));
}
Random random = new Random();
for (int i = 0; i < 10000; i++) {
int j = random.nextInt(dataIds.length());
topnCounterMetricsContainer.increment(dataIds.substring(j, j + 1));
dataList.get(j).getSecond().incrementAndGet();
}
boolean right = true;
Collections.sort(dataList, (a, b) -> b.getSecond().get() - a.getSecond().get());
List<Pair<String, AtomicInteger>> result = topnCounterMetricsContainer.getTopNCounter(N);
for (Pair<String, AtomicInteger> item : result) {
// ensure every top N count is greater than (N+1)th greatest.
if (item.getSecond().get() < dataList.get(N).getSecond().get()) {
right = false;
break;
}
}
Assert.assertTrue(right);
topnCounterMetricsContainer.removeAll();
for (int i = 0; i < dataIds.length(); i++) {
Assert.assertEquals(-1, topnCounterMetricsContainer.get(dataIds.substring(i, i + 1)));
}
Assert.assertEquals(0, topnCounterMetricsContainer.getTopNCounter(N).size());
}
}

View File

@ -37,6 +37,8 @@ import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* * Created with IntelliJ IDEA. User: dingjoey Date: 13-12-12 Time: 21:12 client api && sdk api 请求日志打点逻辑 * * Created with IntelliJ IDEA. User: dingjoey Date: 13-12-12 Time: 21:12 client api && sdk api 请求日志打点逻辑
@ -110,8 +112,11 @@ public class RequestLogAspect {
final String md5 = final String md5 =
request.getContent() == null ? null : MD5Utils.md5Hex(request.getContent(), Constants.ENCODE); request.getContent() == null ? null : MD5Utils.md5Hex(request.getContent(), Constants.ENCODE);
MetricsMonitor.getPublishMonitor().incrementAndGet(); MetricsMonitor.getPublishMonitor().incrementAndGet();
return logClientRequestRpc("publish", pjp, request, meta, request.getDataId(), request.getGroup(), AtomicLong rtHolder = new AtomicLong();
request.getTenant(), md5); Object retVal = logClientRequestRpc("publish", pjp, request, meta, request.getDataId(), request.getGroup(),
request.getTenant(), md5, rtHolder);
MetricsMonitor.getWriteConfigRpcRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
return retVal;
} }
/** /**
@ -122,7 +127,10 @@ public class RequestLogAspect {
HttpServletResponse response, String dataId, String group, String tenant, String content) throws Throwable { HttpServletResponse response, String dataId, String group, String tenant, String content) throws Throwable {
final String md5 = content == null ? null : MD5Utils.md5Hex(content, Constants.ENCODE); final String md5 = content == null ? null : MD5Utils.md5Hex(content, Constants.ENCODE);
MetricsMonitor.getPublishMonitor().incrementAndGet(); MetricsMonitor.getPublishMonitor().incrementAndGet();
return logClientRequest("publish", pjp, request, response, dataId, group, tenant, md5); AtomicLong rtHolder = new AtomicLong();
Object retVal = logClientRequest("publish", pjp, request, response, dataId, group, tenant, md5, rtHolder);
MetricsMonitor.getWriteConfigRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
return retVal;
} }
/** /**
@ -131,7 +139,7 @@ public class RequestLogAspect {
@Around(CLIENT_INTERFACE_REMOVE_ALL_CONFIG) @Around(CLIENT_INTERFACE_REMOVE_ALL_CONFIG)
public Object interfaceRemoveAll(ProceedingJoinPoint pjp, HttpServletRequest request, HttpServletResponse response, public Object interfaceRemoveAll(ProceedingJoinPoint pjp, HttpServletRequest request, HttpServletResponse response,
String dataId, String group, String tenant) throws Throwable { String dataId, String group, String tenant) throws Throwable {
return logClientRequest("remove", pjp, request, response, dataId, group, tenant, null); return logClientRequest("remove", pjp, request, response, dataId, group, tenant, null, null);
} }
/** /**
@ -141,7 +149,7 @@ public class RequestLogAspect {
public Object interfaceRemoveAllRpc(ProceedingJoinPoint pjp, ConfigRemoveRequest request, RequestMeta meta) public Object interfaceRemoveAllRpc(ProceedingJoinPoint pjp, ConfigRemoveRequest request, RequestMeta meta)
throws Throwable { throws Throwable {
return logClientRequestRpc("remove", pjp, request, meta, request.getDataId(), request.getGroup(), return logClientRequestRpc("remove", pjp, request, meta, request.getDataId(), request.getGroup(),
request.getTenant(), null); request.getTenant(), null, null);
} }
/** /**
@ -153,7 +161,10 @@ public class RequestLogAspect {
final String groupKey = GroupKey2.getKey(dataId, group, tenant); final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final String md5 = ConfigCacheService.getContentMd5(groupKey); final String md5 = ConfigCacheService.getContentMd5(groupKey);
MetricsMonitor.getConfigMonitor().incrementAndGet(); MetricsMonitor.getConfigMonitor().incrementAndGet();
return logClientRequest("get", pjp, request, response, dataId, group, tenant, md5); AtomicLong rtHolder = new AtomicLong();
Object retVal = logClientRequest("get", pjp, request, response, dataId, group, tenant, md5, rtHolder);
MetricsMonitor.getReadConfigRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
return retVal;
} }
/** /**
@ -165,20 +176,26 @@ public class RequestLogAspect {
final String groupKey = GroupKey2.getKey(request.getDataId(), request.getGroup(), request.getTenant()); final String groupKey = GroupKey2.getKey(request.getDataId(), request.getGroup(), request.getTenant());
final String md5 = ConfigCacheService.getContentMd5(groupKey); final String md5 = ConfigCacheService.getContentMd5(groupKey);
MetricsMonitor.getConfigMonitor().incrementAndGet(); MetricsMonitor.getConfigMonitor().incrementAndGet();
return logClientRequestRpc("get", pjp, request, meta, request.getDataId(), request.getGroup(), AtomicLong rtHolder = new AtomicLong();
request.getTenant(), md5); Object retVal = logClientRequestRpc("get", pjp, request, meta, request.getDataId(), request.getGroup(),
request.getTenant(), md5, rtHolder);
MetricsMonitor.getReadConfigRpcRtTimer().record(rtHolder.get(), TimeUnit.MILLISECONDS);
return retVal;
} }
/** /**
* Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5. * Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5.
*/ */
private Object logClientRequest(String requestType, ProceedingJoinPoint pjp, HttpServletRequest request, private Object logClientRequest(String requestType, ProceedingJoinPoint pjp, HttpServletRequest request,
HttpServletResponse response, String dataId, String group, String tenant, String md5) throws Throwable { HttpServletResponse response, String dataId, String group, String tenant, String md5, AtomicLong rtHolder) throws Throwable {
final String requestIp = RequestUtil.getRemoteIp(request); final String requestIp = RequestUtil.getRemoteIp(request);
String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
final long st = System.currentTimeMillis(); final long st = System.currentTimeMillis();
Object retVal = pjp.proceed(); Object retVal = pjp.proceed();
final long rt = System.currentTimeMillis() - st; final long rt = System.currentTimeMillis() - st;
if (rtHolder != null) {
rtHolder.set(rt);
}
// rt | status | requestIp | opType | dataId | group | datumId | md5 | // rt | status | requestIp | opType | dataId | group | datumId | md5 |
// appName // appName
LogUtil.CLIENT_LOG LogUtil.CLIENT_LOG
@ -191,12 +208,15 @@ public class RequestLogAspect {
* Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5. * Client api request log rt | status | requestIp | opType | dataId | group | datumId | md5.
*/ */
private Object logClientRequestRpc(String requestType, ProceedingJoinPoint pjp, Request request, RequestMeta meta, private Object logClientRequestRpc(String requestType, ProceedingJoinPoint pjp, Request request, RequestMeta meta,
String dataId, String group, String tenant, String md5) throws Throwable { String dataId, String group, String tenant, String md5, AtomicLong rtHolder) throws Throwable {
final String requestIp = meta.getClientIp(); final String requestIp = meta.getClientIp();
String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String appName = request.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
final long st = System.currentTimeMillis(); final long st = System.currentTimeMillis();
Response retVal = (Response) pjp.proceed(); Response retVal = (Response) pjp.proceed();
final long rt = System.currentTimeMillis() - st; final long rt = System.currentTimeMillis() - st;
if (rtHolder != null) {
rtHolder.set(rt);
}
// rt | status | requestIp | opType | dataId | group | datumId | md5 | // rt | status | requestIp | opType | dataId | group | datumId | md5 |
// appName // appName
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}", rt, LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}", rt,
@ -224,5 +244,4 @@ public class RequestLogAspect {
request.isListen(), "", "", appName); request.isListen(), "", "", appName);
return retVal; return retVal;
} }
} }

View File

@ -39,6 +39,7 @@ import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.model.ConfigRequestInfo; import com.alibaba.nacos.config.server.model.ConfigRequestInfo;
import com.alibaba.nacos.config.server.model.form.ConfigForm; import com.alibaba.nacos.config.server.model.form.ConfigForm;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.result.code.ResultCodeEnum; import com.alibaba.nacos.config.server.result.code.ResultCodeEnum;
import com.alibaba.nacos.config.server.service.ConfigChangePublisher; import com.alibaba.nacos.config.server.service.ConfigChangePublisher;
import com.alibaba.nacos.config.server.service.ConfigOperationService; import com.alibaba.nacos.config.server.service.ConfigOperationService;
@ -388,6 +389,7 @@ public class ConfigController {
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam("pageNo") int pageNo, @RequestParam("pageSize") int pageSize) { @RequestParam("pageNo") int pageNo, @RequestParam("pageSize") int pageSize) {
MetricsMonitor.getFuzzySearchMonitor().incrementAndGet();
Map<String, Object> configAdvanceInfo = new HashMap<>(50); Map<String, Object> configAdvanceInfo = new HashMap<>(50);
if (StringUtils.isNotBlank(appName)) { if (StringUtils.isNotBlank(appName)) {
configAdvanceInfo.put("appName", appName); configAdvanceInfo.put("appName", appName);

View File

@ -0,0 +1,64 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor;
import com.alibaba.nacos.common.utils.Pair;
import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Tag;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* dynamic meter refresh service.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class ConfigDynamicMeterRefreshService {
private static final String TOPN_CONFIG_CHANGE_REGISTRY = NacosMeterRegistryCenter.TOPN_CONFIG_CHANGE_REGISTRY;
private static final int CONFIG_CHANGE_N = 10;
/**
* refresh config change count top n per 30s.
*/
@Scheduled(cron = "0/30 * * * * *")
public void refreshTopnConfigChangeCount() {
NacosMeterRegistryCenter.clear(TOPN_CONFIG_CHANGE_REGISTRY);
List<Pair<String, AtomicInteger>> topnConfigChangeCount = MetricsMonitor.getConfigChangeCount()
.getTopNCounter(CONFIG_CHANGE_N);
for (Pair<String, AtomicInteger> configChangeCount : topnConfigChangeCount) {
List<Tag> tags = new ArrayList<>();
tags.add(new ImmutableTag("config", configChangeCount.getFirst()));
NacosMeterRegistryCenter.gauge(TOPN_CONFIG_CHANGE_REGISTRY, "config_change_count", tags, configChangeCount.getSecond());
}
}
/**
* reset config change count to 0 every week.
*/
@Scheduled(cron = "0 0 0 ? * 1")
public void resetTopnConfigChangeCount() {
MetricsMonitor.getConfigChangeCount().removeAll();
}
}

View File

@ -48,9 +48,13 @@ public class MemoryMonitor {
private static final long DELAY_SECONDS = 10; private static final long DELAY_SECONDS = 10;
/**
* reset some metrics to 0 every day.
*/
@Scheduled(cron = "0 0 0 * * ?") @Scheduled(cron = "0 0 0 * * ?")
public void clear() { public void clear() {
MetricsMonitor.getConfigMonitor().set(0); MetricsMonitor.getConfigMonitor().set(0);
MetricsMonitor.getPublishMonitor().set(0); MetricsMonitor.getPublishMonitor().set(0);
MetricsMonitor.getFuzzySearchMonitor().set(0);
} }
} }

View File

@ -16,14 +16,16 @@
package com.alibaba.nacos.config.server.monitor; package com.alibaba.nacos.config.server.monitor;
import com.alibaba.nacos.common.utils.TopnCounterMetricsContainer;
import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -33,19 +35,21 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class MetricsMonitor { public class MetricsMonitor {
private static final String METER_REGISTRY = NacosMeterRegistryCenter.CONFIG_STABLE_REGISTRY;
private static AtomicInteger getConfig = new AtomicInteger(); private static AtomicInteger getConfig = new AtomicInteger();
private static AtomicInteger publish = new AtomicInteger(); private static AtomicInteger publish = new AtomicInteger();
/** /**
* task for notify config change to sub client of http long polling.. * task for notify config change to sub client of http long polling.
*/ */
private static AtomicInteger longPolling = new AtomicInteger(); private static AtomicInteger longPolling = new AtomicInteger();
private static AtomicInteger configCount = new AtomicInteger(); private static AtomicInteger configCount = new AtomicInteger();
/** /**
* task for ntify config change to cluster server. * task for notify config change to cluster server.
*/ */
private static AtomicInteger notifyTask = new AtomicInteger(); private static AtomicInteger notifyTask = new AtomicInteger();
@ -56,43 +60,74 @@ public class MetricsMonitor {
private static AtomicInteger dumpTask = new AtomicInteger(); private static AtomicInteger dumpTask = new AtomicInteger();
/**
* config fuzzy search count.
*/
private static AtomicInteger fuzzySearch = new AtomicInteger();
/**
* version -> client config subscriber count.
*/
private static ConcurrentHashMap<String, AtomicInteger> configSubscriber = new ConcurrentHashMap<>();
/**
* config change count.
*/
private static TopnCounterMetricsContainer configChangeCount = new TopnCounterMetricsContainer();
static { static {
ImmutableTag immutableTag = new ImmutableTag("module", "config"); ImmutableTag immutableTag = new ImmutableTag("module", "config");
List<Tag> tags = new ArrayList<>(); List<Tag> tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "getConfig")); tags.add(new ImmutableTag("name", "getConfig"));
Metrics.gauge("nacos_monitor", tags, getConfig); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, getConfig);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "publish")); tags.add(new ImmutableTag("name", "publish"));
Metrics.gauge("nacos_monitor", tags, publish); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, publish);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "longPolling")); tags.add(new ImmutableTag("name", "longPolling"));
Metrics.gauge("nacos_monitor", tags, longPolling); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longPolling);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "configCount")); tags.add(new ImmutableTag("name", "configCount"));
Metrics.gauge("nacos_monitor", tags, configCount); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, configCount);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "notifyTask")); tags.add(new ImmutableTag("name", "notifyTask"));
Metrics.gauge("nacos_monitor", tags, notifyTask); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, notifyTask);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "notifyClientTask")); tags.add(new ImmutableTag("name", "notifyClientTask"));
Metrics.gauge("nacos_monitor", tags, notifyClientTask); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, notifyClientTask);
tags = new ArrayList<>(); tags = new ArrayList<>();
tags.add(immutableTag); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "dumpTask")); tags.add(new ImmutableTag("name", "dumpTask"));
Metrics.gauge("nacos_monitor", tags, dumpTask); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, dumpTask);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "fuzzySearch"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, fuzzySearch);
configSubscriber.put("v1", new AtomicInteger(0));
configSubscriber.put("v2", new AtomicInteger(0));
tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v1"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_config_subscriber", tags, configSubscriber.get("v1"));
tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v2"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_config_subscriber", tags, configSubscriber.get("v2"));
} }
public static AtomicInteger getConfigMonitor() { public static AtomicInteger getConfigMonitor() {
@ -123,28 +158,59 @@ public class MetricsMonitor {
return dumpTask; return dumpTask;
} }
public static AtomicInteger getFuzzySearchMonitor() {
return fuzzySearch;
}
public static AtomicInteger getConfigSubscriberMonitor(String version) {
return configSubscriber.get(version);
}
public static TopnCounterMetricsContainer getConfigChangeCount() {
return configChangeCount;
}
public static Timer getReadConfigRtTimer() {
return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "readConfigRt");
}
public static Timer getReadConfigRpcRtTimer() {
return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "readConfigRpcRt");
}
public static Timer getWriteConfigRtTimer() {
return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "writeConfigRt");
}
public static Timer getWriteConfigRpcRtTimer() {
return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "writeConfigRpcRt");
}
public static Timer getNotifyRtTimer() { public static Timer getNotifyRtTimer() {
return Metrics.timer("nacos_timer", "module", "config", "name", "notifyRt"); return NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_timer", "module", "config", "name", "notifyRt");
} }
public static Counter getIllegalArgumentException() { public static Counter getIllegalArgumentException() {
return Metrics.counter("nacos_exception", "module", "config", "name", "illegalArgument"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "illegalArgument");
} }
public static Counter getNacosException() { public static Counter getNacosException() {
return Metrics.counter("nacos_exception", "module", "config", "name", "nacos"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "nacos");
} }
public static Counter getDbException() { public static Counter getDbException() {
return Metrics.counter("nacos_exception", "module", "config", "name", "db"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "db");
} }
public static Counter getConfigNotifyException() { public static Counter getConfigNotifyException() {
return Metrics.counter("nacos_exception", "module", "config", "name", "configNotify"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "configNotify");
} }
public static Counter getUnhealthException() { public static Counter getUnhealthException() {
return Metrics.counter("nacos_exception", "module", "config", "name", "unhealth"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "config", "name", "unhealth");
} }
public static void incrementConfigChangeCount(String tenant, String group, String dataId) {
configChangeCount.increment(tenant + "@" + group + "@" + dataId);
}
} }

View File

@ -0,0 +1,45 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor.collector;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
import com.alibaba.nacos.config.server.service.LongPollingService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* v1 and v2 config subscriber metrics collector.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class ConfigSubscriberMetricsCollector {
private static final long DELAY_SECONDS = 5;
@Autowired
public ConfigSubscriberMetricsCollector(LongPollingService longPollingService, ConfigChangeListenContext configChangeListenContext) {
ConfigExecutor.scheduleConfigTask(() -> {
MetricsMonitor.getConfigSubscriberMonitor("v1").set(longPollingService.getSubscriberCount());
MetricsMonitor.getConfigSubscriberMonitor("v2").set(configChangeListenContext.getConnectionCount());
}, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
}
}

View File

@ -173,4 +173,13 @@ public class ConfigChangeListenContext {
return groupKeyContexts == null ? null : groupKeyContexts.get(groupKey); return groupKeyContexts == null ? null : groupKeyContexts.get(groupKey);
} }
/**
* get connection count.
*
* @return count of long connections.
*/
public int getConnectionCount() {
return connectionIdContext.size();
}
} }

View File

@ -530,4 +530,8 @@ public class LongPollingService {
public void setRetainIps(Map<String, Long> retainIps) { public void setRetainIps(Map<String, Long> retainIps) {
this.retainIps = retainIps; this.retainIps = retainIps;
} }
public int getSubscriberCount() {
return allSubs.size();
}
} }

View File

@ -103,6 +103,9 @@ public class AsyncNotifyService {
String group = evt.group; String group = evt.group;
String tenant = evt.tenant; String tenant = evt.tenant;
String tag = evt.tag; String tag = evt.tag;
MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
Collection<Member> ipList = memberManager.allMembers(); Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be // In fact, any type of queue here can be

View File

@ -18,7 +18,6 @@ package com.alibaba.nacos.core.monitor;
import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
@ -33,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public final class MetricsMonitor { public final class MetricsMonitor {
private static final String METER_REGISTRY = NacosMeterRegistryCenter.CORE_STABLE_REGISTRY;
private static final DistributionSummary RAFT_READ_INDEX_FAILED; private static final DistributionSummary RAFT_READ_INDEX_FAILED;
private static final DistributionSummary RAFT_FROM_LEADER; private static final DistributionSummary RAFT_FROM_LEADER;
@ -44,16 +45,31 @@ public final class MetricsMonitor {
private static AtomicInteger longConnection = new AtomicInteger(); private static AtomicInteger longConnection = new AtomicInteger();
static { static {
RAFT_READ_INDEX_FAILED = NacosMeterRegistry.summary("protocol", "raft_read_index_failed"); ImmutableTag immutableTag = new ImmutableTag("module", "core");
RAFT_FROM_LEADER = NacosMeterRegistry.summary("protocol", "raft_read_from_leader");
RAFT_APPLY_LOG_TIMER = NacosMeterRegistry.timer("protocol", "raft_apply_log_timer");
RAFT_APPLY_READ_TIMER = NacosMeterRegistry.timer("protocol", "raft_apply_read_timer");
List<Tag> tags = new ArrayList<>(); List<Tag> tags = new ArrayList<>();
tags.add(new ImmutableTag("module", "config")); tags.add(immutableTag);
tags.add(new ImmutableTag("name", "raft_read_index_failed"));
RAFT_READ_INDEX_FAILED = NacosMeterRegistryCenter.summary(METER_REGISTRY, "nacos_monitor", tags);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "raft_read_from_leader"));
RAFT_FROM_LEADER = NacosMeterRegistryCenter.summary(METER_REGISTRY, "nacos_monitor", tags);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "raft_apply_log_timer"));
RAFT_APPLY_LOG_TIMER = NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_monitor", tags);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "raft_apply_read_timer"));
RAFT_APPLY_READ_TIMER = NacosMeterRegistryCenter.timer(METER_REGISTRY, "nacos_monitor", tags);
tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "longConnection")); tags.add(new ImmutableTag("name", "longConnection"));
Metrics.gauge("nacos_monitor", tags, longConnection); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longConnection);
} }

View File

@ -1,57 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.monitor;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.ArrayList;
import java.util.List;
/**
* Metrics unified usage center.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
public final class NacosMeterRegistry {
private static final CompositeMeterRegistry METER_REGISTRY = new CompositeMeterRegistry();
public static DistributionSummary summary(String module, String name) {
ImmutableTag moduleTag = new ImmutableTag("module", module);
List<Tag> tags = new ArrayList<>();
tags.add(moduleTag);
tags.add(new ImmutableTag("name", name));
return METER_REGISTRY.summary("nacos_monitor", tags);
}
public static Timer timer(String module, String name) {
ImmutableTag moduleTag = new ImmutableTag("module", module);
List<Tag> tags = new ArrayList<>();
tags.add(moduleTag);
tags.add(new ImmutableTag("name", name));
return METER_REGISTRY.timer("nacos_monitor", tags);
}
public static CompositeMeterRegistry getMeterRegistry() {
return METER_REGISTRY;
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.monitor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.concurrent.ConcurrentHashMap;
/**
* Metrics unified usage center.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@SuppressWarnings("all")
public final class NacosMeterRegistryCenter {
// stable registries.
public static final String CORE_STABLE_REGISTRY = "CORE_STABLE_REGISTRY";
public static final String CONFIG_STABLE_REGISTRY = "CONFIG_STABLE_REGISTRY";
public static final String NAMING_STABLE_REGISTRY = "NAMING_STABLE_REGISTRY";
// dynamic registries.
public static final String TOPN_CONFIG_CHANGE_REGISTRY = "TOPN_CONFIG_CHANGE_REGISTRY";
public static final String TOPN_SERVICE_CHANGE_REGISTRY = "TOPN_SERVICE_CHANGE_REGISTRY";
private static final ConcurrentHashMap<String, CompositeMeterRegistry> METER_REGISTRIES = new ConcurrentHashMap<>();
private static PrometheusMeterRegistry PROMETHEUS_METER_REGISTRY = null;
static {
try {
PROMETHEUS_METER_REGISTRY = ApplicationUtils.getBean(PrometheusMeterRegistry.class);
} catch (Throwable t) {
Loggers.CORE.warn("Metrics init failed :", t);
}
CompositeMeterRegistry compositeMeterRegistry;
compositeMeterRegistry = new CompositeMeterRegistry();
if (PROMETHEUS_METER_REGISTRY != null) {
compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
}
METER_REGISTRIES.put(CORE_STABLE_REGISTRY, compositeMeterRegistry);
compositeMeterRegistry = new CompositeMeterRegistry();
if (PROMETHEUS_METER_REGISTRY != null) {
compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
}
METER_REGISTRIES.put(CONFIG_STABLE_REGISTRY, compositeMeterRegistry);
compositeMeterRegistry = new CompositeMeterRegistry();
if (PROMETHEUS_METER_REGISTRY != null) {
compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
}
METER_REGISTRIES.put(NAMING_STABLE_REGISTRY, compositeMeterRegistry);
compositeMeterRegistry = new CompositeMeterRegistry();
if (PROMETHEUS_METER_REGISTRY != null) {
compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
}
METER_REGISTRIES.put(TOPN_CONFIG_CHANGE_REGISTRY, compositeMeterRegistry);
compositeMeterRegistry = new CompositeMeterRegistry();
if (PROMETHEUS_METER_REGISTRY != null) {
compositeMeterRegistry.add(PROMETHEUS_METER_REGISTRY);
}
METER_REGISTRIES.put(TOPN_SERVICE_CHANGE_REGISTRY, compositeMeterRegistry);
}
public static Counter counter(String registry, String name, Iterable<Tag> tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).counter(name, tags);
}
return null;
}
public static Counter counter(String registry, String name, String... tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).counter(name, tags);
}
return null;
}
public static <T extends Number> T gauge(String registry, String name, Iterable<Tag> tags, T number) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).gauge(name, tags, number);
}
return null;
}
public static Timer timer(String registry, String name, Iterable<Tag> tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).timer(name, tags);
}
return null;
}
public static Timer timer(String registry, String name, String... tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).timer(name, tags);
}
return null;
}
public static DistributionSummary summary(String registry, String name, Iterable<Tag> tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).summary(name, tags);
}
return null;
}
public static DistributionSummary summary(String registry, String name, String... tags) {
CompositeMeterRegistry compositeMeterRegistry = METER_REGISTRIES.get(registry);
if (compositeMeterRegistry != null) {
return METER_REGISTRIES.get(registry).summary(name, tags);
}
return null;
}
public static void clear(String registry) {
METER_REGISTRIES.get(registry).clear();
}
/**
* Just for test. Don't register meter by getMeterRegistry.
*
* @param registry
* @return CompositeMeterRegistry in NacosMeterRegistryCenter.
*/
public static CompositeMeterRegistry getMeterRegistry(String registry) {
return METER_REGISTRIES.get(registry);
}
}

View File

@ -16,26 +16,42 @@
package com.alibaba.nacos.core.monitor; package com.alibaba.nacos.core.monitor;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.when;
/** /**
* {@link MetricsMonitor} and {@link NacosMeterRegistry} unit tests. * {@link MetricsMonitor} and {@link NacosMeterRegistryCenter} unit tests.
* *
* @author chenglu * @author chenglu
* @date 2021-06-15 22:58 * @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/ */
@RunWith(MockitoJUnitRunner.Silent.class)
public class MetricsMonitorTest { public class MetricsMonitorTest {
@Mock
private ConfigurableApplicationContext context;
@Before @Before
public void initMeterRegistry() { public void initMeterRegistry() {
NacosMeterRegistry.getMeterRegistry().add(new SimpleMeterRegistry()); ApplicationUtils.injectContext(context);
when(context.getBean(PrometheusMeterRegistry.class)).thenReturn(null);
// add simple meterRegistry.
NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.CORE_STABLE_REGISTRY)
.add(new SimpleMeterRegistry());
} }
@Test @Test
@ -63,7 +79,7 @@ public class MetricsMonitorTest {
raftApplyTimerLog.record(10, TimeUnit.SECONDS); raftApplyTimerLog.record(10, TimeUnit.SECONDS);
raftApplyTimerLog.record(20, TimeUnit.SECONDS); raftApplyTimerLog.record(20, TimeUnit.SECONDS);
Assert.assertEquals(0.5D, raftApplyTimerLog.totalTime(TimeUnit.MINUTES), 0.01); Assert.assertEquals(0.5D, raftApplyTimerLog.totalTime(TimeUnit.MINUTES), 0.01);
Assert.assertEquals(30D, raftApplyTimerLog.totalTime(TimeUnit.SECONDS), 0.01); Assert.assertEquals(30D, raftApplyTimerLog.totalTime(TimeUnit.SECONDS), 0.01);
} }
@ -73,7 +89,7 @@ public class MetricsMonitorTest {
raftApplyReadTimer.record(10, TimeUnit.SECONDS); raftApplyReadTimer.record(10, TimeUnit.SECONDS);
raftApplyReadTimer.record(20, TimeUnit.SECONDS); raftApplyReadTimer.record(20, TimeUnit.SECONDS);
Assert.assertEquals(0.5D, raftApplyReadTimer.totalTime(TimeUnit.MINUTES), 0.01); Assert.assertEquals(0.5D, raftApplyReadTimer.totalTime(TimeUnit.MINUTES), 0.01);
Assert.assertEquals(30D, raftApplyReadTimer.totalTime(TimeUnit.SECONDS), 0.01); Assert.assertEquals(30D, raftApplyReadTimer.totalTime(TimeUnit.SECONDS), 0.01);
} }
} }

View File

@ -18,15 +18,17 @@ package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.naming.core.v2.pojo.BatchInstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.BatchInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.common.utils.TopnCounterMetricsContainer;
import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -37,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class MetricsMonitor { public class MetricsMonitor {
private static final String METER_REGISTRY = NacosMeterRegistryCenter.NAMING_STABLE_REGISTRY;
private static final MetricsMonitor INSTANCE = new MetricsMonitor(); private static final MetricsMonitor INSTANCE = new MetricsMonitor();
private final AtomicInteger mysqlHealthCheck = new AtomicInteger(); private final AtomicInteger mysqlHealthCheck = new AtomicInteger();
@ -65,6 +69,29 @@ public class MetricsMonitor {
private final AtomicInteger failedPush = new AtomicInteger(); private final AtomicInteger failedPush = new AtomicInteger();
private final AtomicInteger emptyPush = new AtomicInteger();
private final AtomicInteger serviceSubscribedEventQueueSize = new AtomicInteger();
private final AtomicInteger serviceChangedEventQueueSize = new AtomicInteger();
private final AtomicInteger pushPendingTaskCount = new AtomicInteger();
/**
* version -> naming subscriber count.
*/
private final ConcurrentHashMap<String, AtomicInteger> namingSubscriber = new ConcurrentHashMap<>();
/**
* version -> naming publisher count.
*/
private final ConcurrentHashMap<String, AtomicInteger> namingPublisher = new ConcurrentHashMap<>();
/**
* topn service change count.
*/
private final TopnCounterMetricsContainer serviceChangeCount = new TopnCounterMetricsContainer();
private MetricsMonitor() { private MetricsMonitor() {
for (Field each : MetricsMonitor.class.getDeclaredFields()) { for (Field each : MetricsMonitor.class.getDeclaredFields()) {
if (Number.class.isAssignableFrom(each.getType())) { if (Number.class.isAssignableFrom(each.getType())) {
@ -76,13 +103,35 @@ public class MetricsMonitor {
} }
} }
} }
namingSubscriber.put("v1", new AtomicInteger(0));
namingSubscriber.put("v2", new AtomicInteger(0));
List<Tag> tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v1"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_subscriber", tags, namingSubscriber.get("v1"));
tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v2"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_subscriber", tags, namingSubscriber.get("v2"));
namingPublisher.put("v1", new AtomicInteger(0));
namingPublisher.put("v2", new AtomicInteger(0));
tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v1"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_publisher", tags, namingPublisher.get("v1"));
tags = new ArrayList<>();
tags.add(new ImmutableTag("version", "v2"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_naming_publisher", tags, namingPublisher.get("v2"));
} }
private <T extends Number> void registerToMetrics(String name, T number) { private <T extends Number> void registerToMetrics(String name, T number) {
List<Tag> tags = new ArrayList<>(); List<Tag> tags = new ArrayList<>();
tags.add(new ImmutableTag("module", "naming")); tags.add(new ImmutableTag("module", "naming"));
tags.add(new ImmutableTag("name", name)); tags.add(new ImmutableTag("name", name));
Metrics.gauge("nacos_monitor", tags, number); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, number);
} }
public static AtomicInteger getMysqlHealthCheckMonitor() { public static AtomicInteger getMysqlHealthCheckMonitor() {
@ -129,14 +178,42 @@ public class MetricsMonitor {
return INSTANCE.failedPush; return INSTANCE.failedPush;
} }
public static AtomicInteger getEmptyPushMonitor() {
return INSTANCE.emptyPush;
}
public static AtomicInteger getTotalPushCountForAvg() { public static AtomicInteger getTotalPushCountForAvg() {
return INSTANCE.totalPushCountForAvg; return INSTANCE.totalPushCountForAvg;
} }
public static AtomicInteger getServiceSubscribedEventQueueSize() {
return INSTANCE.serviceSubscribedEventQueueSize;
}
public static AtomicInteger getServiceChangedEventQueueSize() {
return INSTANCE.serviceChangedEventQueueSize;
}
public static AtomicInteger getPushPendingTaskCount() {
return INSTANCE.pushPendingTaskCount;
}
public static AtomicLong getTotalPushCostForAvg() { public static AtomicLong getTotalPushCostForAvg() {
return INSTANCE.totalPushCostForAvg; return INSTANCE.totalPushCostForAvg;
} }
public static AtomicInteger getNamingSubscriber(String version) {
return INSTANCE.namingSubscriber.get(version);
}
public static AtomicInteger getNamingPublisher(String version) {
return INSTANCE.namingPublisher.get(version);
}
public static TopnCounterMetricsContainer getServiceChangeCount() {
return INSTANCE.serviceChangeCount;
}
public static void compareAndSetMaxPushCost(long newCost) { public static void compareAndSetMaxPushCost(long newCost) {
INSTANCE.maxPushCost.getAndUpdate((prev) -> Math.max(newCost, prev)); INSTANCE.maxPushCost.getAndUpdate((prev) -> Math.max(newCost, prev));
} }
@ -154,6 +231,10 @@ public class MetricsMonitor {
INSTANCE.failedPush.incrementAndGet(); INSTANCE.failedPush.incrementAndGet();
} }
public static void incrementEmptyPush() {
INSTANCE.emptyPush.incrementAndGet();
}
public static void incrementInstanceCount() { public static void incrementInstanceCount() {
INSTANCE.ipCount.incrementAndGet(); INSTANCE.ipCount.incrementAndGet();
} }
@ -170,12 +251,16 @@ public class MetricsMonitor {
INSTANCE.subscriberCount.decrementAndGet(); INSTANCE.subscriberCount.decrementAndGet();
} }
public static void incrementServiceChangeCount(String namespace, String group, String name) {
INSTANCE.serviceChangeCount.increment(namespace + "@" + group + "@" + name);
}
public static Counter getDiskException() { public static Counter getDiskException() {
return Metrics.counter("nacos_exception", "module", "naming", "name", "disk"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "naming", "name", "disk");
} }
public static Counter getLeaderSendBeatFailedException() { public static Counter getLeaderSendBeatFailedException() {
return Metrics.counter("nacos_exception", "module", "naming", "name", "leaderSendBeatFailed"); return NacosMeterRegistryCenter.counter(METER_REGISTRY, "nacos_exception", "module", "naming", "name", "leaderSendBeatFailed");
} }
/** /**
@ -214,6 +299,7 @@ public class MetricsMonitor {
public static void resetPush() { public static void resetPush() {
getTotalPushMonitor().set(0); getTotalPushMonitor().set(0);
getFailedPushMonitor().set(0); getFailedPushMonitor().set(0);
getEmptyPushMonitor().set(0);
getTotalPushCostForAvg().set(0); getTotalPushCostForAvg().set(0);
getTotalPushCountForAvg().set(0); getTotalPushCountForAvg().set(0);
getMaxPushCostMonitor().set(-1); getMaxPushCostMonitor().set(-1);

View File

@ -0,0 +1,64 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.common.utils.Pair;
import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Tag;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* dynamic meter refresh service.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class NamingDynamicMeterRefreshService {
private static final String TOPN_SERVICE_CHANGE_REGISTRY = NacosMeterRegistryCenter.TOPN_SERVICE_CHANGE_REGISTRY;
private static final int SERVICE_CHANGE_N = 10;
/**
* refresh service change count top n per 30s.
*/
@Scheduled(cron = "0/30 * * * * *")
public void refreshTopnConfigChangeCount() {
NacosMeterRegistryCenter.clear(TOPN_SERVICE_CHANGE_REGISTRY);
List<Pair<String, AtomicInteger>> topnServiceChangeCount = MetricsMonitor.getServiceChangeCount()
.getTopNCounter(SERVICE_CHANGE_N);
for (Pair<String, AtomicInteger> serviceChangeCount : topnServiceChangeCount) {
List<Tag> tags = new ArrayList<>();
tags.add(new ImmutableTag("service", serviceChangeCount.getFirst()));
NacosMeterRegistryCenter.gauge(TOPN_SERVICE_CHANGE_REGISTRY, "service_change_count", tags, serviceChangeCount.getSecond());
}
}
/**
* reset service change count to 0 every week.
*/
@Scheduled(cron = "0 0 0 ? * 1")
public void resetTopnServiceChangeCount() {
MetricsMonitor.getServiceChangeCount().removeAll();
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.monitor.collector;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* v1 and v2 naming subscriber and publisher metrics collector.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class NamingSubAndPubMetricsCollector {
private static final long DELAY_SECONDS = 5;
private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
Thread thread = new Thread(r, "nacos.naming.monitor.NamingSubAndPubMetricsCollector");
thread.setDaemon(true);
return thread;
});
@Autowired
public NamingSubAndPubMetricsCollector(ConnectionBasedClientManager connectionBasedClientManager,
EphemeralIpPortClientManager ephemeralIpPortClientManager, PersistentIpPortClientManager persistentIpPortClientManager) {
executorService.scheduleWithFixedDelay(() -> {
int v1SubscriberCount = 0;
int v1PublisherCount = 0;
for (String clientId : ephemeralIpPortClientManager.allClientId()) {
Client client = ephemeralIpPortClientManager.getClient(clientId);
if (null != client) {
v1PublisherCount += client.getAllPublishedService().size();
v1SubscriberCount += client.getAllSubscribeService().size();
}
}
for (String clientId : persistentIpPortClientManager.allClientId()) {
Client client = persistentIpPortClientManager.getClient(clientId);
if (null != client) {
v1PublisherCount += client.getAllPublishedService().size();
v1SubscriberCount += client.getAllSubscribeService().size();
}
}
MetricsMonitor.getNamingSubscriber("v1").set(v1SubscriberCount);
MetricsMonitor.getNamingPublisher("v1").set(v1PublisherCount);
int v2SubscriberCount = 0;
int v2PublisherCount = 0;
for (String clientId : connectionBasedClientManager.allClientId()) {
Client client = connectionBasedClientManager.getClient(clientId);
if (null != client) {
v2PublisherCount += client.getAllPublishedService().size();
v2SubscriberCount += client.getAllSubscribeService().size();
}
}
MetricsMonitor.getNamingSubscriber("v2").set(v2SubscriberCount);
MetricsMonitor.getNamingPublisher("v2").set(v2PublisherCount);
}, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.monitor.collector;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* pending push task metrics collector.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class PushPendingTaskCountMetricsCollector {
private static final long DELAY_SECONDS = 2;
private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
Thread thread = new Thread(r, "nacos.naming.monitor.PushPendingTaskCountMetricsCollector");
thread.setDaemon(true);
return thread;
});
@Autowired
public PushPendingTaskCountMetricsCollector(NamingSubscriberServiceV2Impl namingSubscriberServiceV2) {
executorService.scheduleWithFixedDelay(() -> {
MetricsMonitor.getPushPendingTaskCount().set(namingSubscriberServiceV2.getPushPendingTaskCount());
}, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.monitor.collector;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* ServiceEvent queue size metrics collector.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@Service
public class ServiceEventQueueSizeMetricsCollector {
private static final long DELAY_SECONDS = 2;
private static ScheduledExecutorService executorService = ExecutorFactory.newSingleScheduledExecutorService(r -> {
Thread thread = new Thread(r, "nacos.naming.monitor.ServiceEventQueueSizeMetricsCollector");
thread.setDaemon(true);
return thread;
});
public ServiceEventQueueSizeMetricsCollector() {
executorService.scheduleWithFixedDelay(() -> {
MetricsMonitor.getServiceSubscribedEventQueueSize().set(
(int) NotifyCenter.getPublisher(ServiceEvent.ServiceSubscribedEvent.class).currentEventSize());
MetricsMonitor.getServiceChangedEventQueueSize().set(
(int) NotifyCenter.getPublisher(ServiceEvent.ServiceChangedEvent.class).currentEventSize());
}, DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
}
}

View File

@ -29,6 +29,7 @@ import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager; import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.NamingSubscriberService; import com.alibaba.nacos.naming.push.NamingSubscriberService;
import com.alibaba.nacos.naming.push.v2.executor.PushExecutorDelegate; import com.alibaba.nacos.naming.push.v2.executor.PushExecutorDelegate;
@ -117,6 +118,7 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService(); Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client. // If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
@ -130,4 +132,8 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
Collection<Service> services = indexesManager.getSubscribedService(); Collection<Service> services = indexesManager.getSubscribedService();
return services.size() > PARALLEL_SIZE ? services.parallelStream() : services.stream(); return services.size() > PARALLEL_SIZE ? services.parallelStream() : services.stream();
} }
public int getPushPendingTaskCount() {
return delayTaskEngine.size();
}
} }

View File

@ -32,6 +32,9 @@ public class NacosMonitorPushResultHook implements PushResultHook {
MetricsMonitor.incrementPush(); MetricsMonitor.incrementPush();
MetricsMonitor.incrementPushCost(result.getAllCost()); MetricsMonitor.incrementPushCost(result.getAllCost());
MetricsMonitor.compareAndSetMaxPushCost(result.getAllCost()); MetricsMonitor.compareAndSetMaxPushCost(result.getAllCost());
if (null == result.getData().getHosts() || !result.getData().validate()) {
MetricsMonitor.incrementEmptyPush();
}
if (isRpc(result.getSubscriber())) { if (isRpc(result.getSubscriber())) {
NamingTpsMonitor.rpcPushSuccess(result.getSubscribeClientId(), result.getSubscriber().getIp()); NamingTpsMonitor.rpcPushSuccess(result.getSubscribeClientId(), result.getSubscriber().getIp());
} else { } else {

View File

@ -16,15 +16,34 @@
package com.alibaba.nacos.naming.monitor; package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.core.monitor.NacosMeterRegistryCenter;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class MetricsMonitorTest { public class MetricsMonitorTest {
@Mock
private ConfigurableApplicationContext context;
@Before @Before
public void setUp() { public void setUp() {
ApplicationUtils.injectContext(context);
when(context.getBean(PrometheusMeterRegistry.class)).thenReturn(null);
// add simple meterRegistry.
NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.NAMING_STABLE_REGISTRY)
.add(new SimpleMeterRegistry());
MetricsMonitor.resetPush(); MetricsMonitor.resetPush();
} }

View File

@ -0,0 +1,102 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.hook;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.core.remote.control.TpsMonitorManager;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ConfigurableApplicationContext;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
/**
* test for NacosMonitorPushResultHook.
*
* @author <a href="mailto:liuyixiao0821@gmail.com">liuyixiao</a>
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class NacosMonitorPushResultHookTest {
@Mock
private PushResult pushResult;
@Mock
private Subscriber subscriber;
@Mock
private TpsMonitorManager tpsMonitorManager;
@Mock
private ConfigurableApplicationContext context;
@Mock
private Instance instance;
private final ServiceInfo serviceInfo = new ServiceInfo("name", "cluster");
private final long allCost = 100L;
@Before
public void setUp() {
MetricsMonitor.resetAll();
serviceInfo.setHosts(new ArrayList<>());
subscriber.setIp("0.0.0.0");
when(instance.getWeight()).thenReturn(1.0);
when(instance.isHealthy()).thenReturn(true);
when(pushResult.getAllCost()).thenReturn(allCost);
when(pushResult.getData()).thenReturn(serviceInfo);
when(pushResult.getSubscriber()).thenReturn(subscriber);
ApplicationUtils.injectContext(context);
when(context.getBean(TpsMonitorManager.class)).thenReturn(tpsMonitorManager);
}
@Test
public void testPushSuccessForEmptyPush() {
new NacosMonitorPushResultHook().pushSuccess(pushResult);
assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
assertEquals(1, MetricsMonitor.getEmptyPushMonitor().get());
assertEquals(allCost, MetricsMonitor.getMaxPushCostMonitor().get());
}
@Test
public void testPushSuccessForNoEmptyPush() {
ArrayList<Instance> hosts = new ArrayList<>();
hosts.add(instance);
serviceInfo.setHosts(hosts);
new NacosMonitorPushResultHook().pushSuccess(pushResult);
assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
assertEquals(0, MetricsMonitor.getEmptyPushMonitor().get());
assertEquals(allCost, MetricsMonitor.getMaxPushCostMonitor().get());
}
@Test
public void testPushFailed() {
new NacosMonitorPushResultHook().pushFailed(pushResult);
assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
}
}