Support metric for module connection (#11426)
* support metric for module connection * fix pmd * add ut
This commit is contained in:
parent
2fd002c8d1
commit
18a4672dd9
@ -23,6 +23,9 @@ import io.micrometer.core.instrument.Timer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
@ -43,7 +46,9 @@ public final class MetricsMonitor {
|
||||
private static final Timer RAFT_APPLY_READ_TIMER;
|
||||
|
||||
private static AtomicInteger longConnection = new AtomicInteger();
|
||||
|
||||
|
||||
private static Map<String, AtomicInteger> moduleConnectionCnt = new ConcurrentHashMap<>();
|
||||
|
||||
static {
|
||||
ImmutableTag immutableTag = new ImmutableTag("module", "core");
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
@ -100,4 +105,46 @@ public final class MetricsMonitor {
|
||||
public static DistributionSummary getRaftFromLeader() {
|
||||
return RAFT_FROM_LEADER;
|
||||
}
|
||||
|
||||
/**
|
||||
* refresh all module connection count.
|
||||
*
|
||||
* @param connectionCnt new connection count.
|
||||
*/
|
||||
public static void refreshModuleConnectionCount(Map<String, Integer> connectionCnt) {
|
||||
// refresh all existed module connection cnt and add new module connection count
|
||||
connectionCnt.forEach((module, cnt) -> {
|
||||
AtomicInteger integer = moduleConnectionCnt.get(module);
|
||||
// if exists
|
||||
if (integer != null) {
|
||||
integer.set(cnt);
|
||||
} else {
|
||||
// new module comes
|
||||
AtomicInteger newModuleConnCnt = new AtomicInteger(cnt);
|
||||
moduleConnectionCnt.put(module, newModuleConnCnt);
|
||||
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor",
|
||||
Arrays.asList(
|
||||
new ImmutableTag("module", module),
|
||||
new ImmutableTag("name", "longConnection")
|
||||
),
|
||||
moduleConnectionCnt.get(module));
|
||||
}
|
||||
});
|
||||
// reset the outdated module connection cnt
|
||||
moduleConnectionCnt.forEach((module, cnt) -> {
|
||||
if (connectionCnt.containsKey(module)) {
|
||||
return;
|
||||
}
|
||||
cnt.set(0);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* getter.
|
||||
*
|
||||
* @return moduleConnectionCnt.
|
||||
*/
|
||||
public static Map<String, AtomicInteger> getModuleConnectionCnt() {
|
||||
return moduleConnectionCnt;
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import com.alibaba.nacos.plugin.control.configs.ControlConfigs;
|
||||
import com.alibaba.nacos.plugin.control.connection.request.ConnectionCheckRequest;
|
||||
import com.alibaba.nacos.plugin.control.connection.response.ConnectionCheckResponse;
|
||||
import com.alibaba.nacos.plugin.control.connection.rule.ConnectionControlRule;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@ -252,7 +253,18 @@ public class ConnectionManager {
|
||||
runtimeConnectionEjector.doEject();
|
||||
MetricsMonitor.getLongConnectionMonitor().set(connections.size());
|
||||
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
|
||||
|
||||
|
||||
Boolean enabled = EnvUtil.getProperty("nacos.metric.grpc.server.connection.enabled", Boolean.class, true);
|
||||
if (enabled) {
|
||||
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
|
||||
Map<String, Integer> count = new HashMap<>(16);
|
||||
connections.forEach((id, connection) -> {
|
||||
String module = connection.getLabels().getOrDefault(RemoteConstants.LABEL_MODULE, "unknown");
|
||||
count.put(module, count.getOrDefault(module, 0) + 1);
|
||||
});
|
||||
MetricsMonitor.refreshModuleConnectionCount(count);
|
||||
}, 1L, EnvUtil.getProperty("nacos.metric.grpc.server.connection.interval", Long.class, 15L), TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
public void loadCount(int loadClient, String redirectAddress) {
|
||||
|
@ -28,6 +28,8 @@ import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -92,4 +94,31 @@ public class MetricsMonitorTest {
|
||||
|
||||
Assert.assertEquals(30D, raftApplyReadTimer.totalTime(TimeUnit.SECONDS), 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshModuleConnectionCount() {
|
||||
// refresh
|
||||
Map<String, Integer> map = new HashMap<>();
|
||||
map.put("naming", 10);
|
||||
MetricsMonitor.refreshModuleConnectionCount(map);
|
||||
Assert.assertEquals(1, MetricsMonitor.getModuleConnectionCnt().size());
|
||||
Assert.assertEquals(10, MetricsMonitor.getModuleConnectionCnt().get("naming").get());
|
||||
|
||||
// refresh again
|
||||
map = new HashMap<>();
|
||||
map.put("naming", 11);
|
||||
map.put("config", 1);
|
||||
MetricsMonitor.refreshModuleConnectionCount(map);
|
||||
Assert.assertEquals(2, MetricsMonitor.getModuleConnectionCnt().size());
|
||||
Assert.assertEquals(11, MetricsMonitor.getModuleConnectionCnt().get("naming").get());
|
||||
Assert.assertEquals(1, MetricsMonitor.getModuleConnectionCnt().get("config").get());
|
||||
|
||||
// refresh again
|
||||
map = new HashMap<>();
|
||||
map.put("naming", 1);
|
||||
MetricsMonitor.refreshModuleConnectionCount(map);
|
||||
Assert.assertEquals(2, MetricsMonitor.getModuleConnectionCnt().size());
|
||||
Assert.assertEquals(1, MetricsMonitor.getModuleConnectionCnt().get("naming").get());
|
||||
Assert.assertEquals(0, MetricsMonitor.getModuleConnectionCnt().get("config").get());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user