tps monitor key use self define submit;active detection both on client and server (#4767)

* tps monitor key use self define submit

* pmd and check style.

* active detection of client and server

* connection manager ,active send interval increase
This commit is contained in:
nov.lzf 2021-01-21 14:40:07 +08:00 committed by GitHub
parent 91bf227c4a
commit 4e7346db1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 856 additions and 157 deletions

View File

@ -0,0 +1,32 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
/**
* client active detection request from server.
*
* @author liuzunfei
* @version $Id: ClientDetectionRequest.java, v 0.1 2021年01月20日 2:42 PM liuzunfei Exp $
*/
public class ClientDetectionRequest extends ServerRequest {
@Override
public String getModule() {
return "internal";
}
}

View File

@ -17,10 +17,10 @@
package com.alibaba.nacos.api.remote.request; package com.alibaba.nacos.api.remote.request;
/** /**
* ConnectResetResponse. * ConnectResetRequest.
* *
* @author liuzunfei * @author liuzunfei
* @version $Id: ConnectResetResponse.java, v 0.1 2020年07月15日 11:11 AM liuzunfei Exp $ * @version $Id: ConnectResetRequest.java, v 0.1 2020年07月15日 11:11 AM liuzunfei Exp $
*/ */
public class ConnectResetRequest extends ServerRequest { public class ConnectResetRequest extends ServerRequest {

View File

@ -0,0 +1,27 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* response of client active detection check.
*
* @author liuzunfei
* @version $Id: ServerCheckResponse.java, v 0.1 2021年01月20日 10:37 PM liuzunfei Exp $
*/
public class ClientDetectionResponse extends Response {
}

View File

@ -1,54 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* PlainBodyResponse.
*
* @author liuzunfei
* @version $Id: PlainBodyResponse.java, v 0.1 2020年07月15日 1:37 PM liuzunfei Exp $
*/
public class PlainBodyResponse extends Response {
private String bodyString;
public PlainBodyResponse() {
}
public PlainBodyResponse(String bodyString) {
this.bodyString = bodyString;
}
/**
* Getter method for property <tt>bodyString</tt>.
*
* @return property value of bodyString
*/
public String getBodyString() {
return bodyString;
}
/**
* Setter method for property <tt>bodyString</tt>.
*
* @param bodyString value to be assigned to property bodyString
*/
public void setBodyString(String bodyString) {
this.bodyString = bodyString;
}
}

View File

@ -21,9 +21,11 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.PayloadRegistry; import com.alibaba.nacos.api.remote.PayloadRegistry;
import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture; import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.ClientDetectionRequest;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.HealthCheckRequest; import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.ClientDetectionResponse;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ErrorResponse; import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
@ -45,6 +47,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
@ -81,6 +84,13 @@ public abstract class RpcClient implements Closeable {
private static final long DEFAULT_TIMEOUT_MILLS = 3000L; private static final long DEFAULT_TIMEOUT_MILLS = 3000L;
/**
* default keep alive time 10s.
*/
private long keepAliveTime = 10000L;
private long lastActiveTimeStamp = System.currentTimeMillis();
/** /**
* listener called where connection's status changed. * listener called where connection's status changed.
*/ */
@ -182,15 +192,16 @@ public abstract class RpcClient implements Closeable {
* *
* @param serverListFactory serverListFactory * @param serverListFactory serverListFactory
*/ */
public void init(ServerListFactory serverListFactory) { public RpcClient init(ServerListFactory serverListFactory) {
if (!isWaitInitiated()) { if (!isWaitInitiated()) {
return; return this;
} }
this.serverListFactory = serverListFactory; this.serverListFactory = serverListFactory;
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name, LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name,
serverListFactory.getClass().getName()); serverListFactory.getClass().getName());
return this;
} }
/** /**
@ -198,9 +209,21 @@ public abstract class RpcClient implements Closeable {
* *
* @param labels labels * @param labels labels
*/ */
public void initLabels(Map<String, String> labels) { public RpcClient initLabels(Map<String, String> labels) {
this.labels.putAll(labels); this.labels.putAll(labels);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels); LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels);
return this;
}
/**
* init keepalive time.
*
* @param keepAliveTime keepAliveTime
* @param timeUnit timeUnit
*/
public RpcClient initKeepAlive(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime * timeUnit.toMillis(keepAliveTime);
return this;
} }
/** /**
@ -248,7 +271,37 @@ public abstract class RpcClient implements Closeable {
public void run() { public void run() {
while (true) { while (true) {
try { try {
ReconnectContext reconnectContext = reconnectionSignal.take(); ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
//check alive time.
long now = System.currentTimeMillis();
if (now - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}]Server healthy check fail,currentConnection={}", name,
currentConnection.getConnectionId());
if (rpcClientStatus
.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = now;
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) { if (reconnectContext.serverInfo != null) {
//clear recommend server if server is not in server list. //clear recommend server if server is not in server list.
String address = reconnectContext.serverInfo.serverIp + Constants.COLON String address = reconnectContext.serverInfo.serverIp + Constants.COLON
@ -311,6 +364,18 @@ public abstract class RpcClient implements Closeable {
} }
registerServerRequestHandler(new ConnectResetRequestHandler()); registerServerRequestHandler(new ConnectResetRequestHandler());
//register client detection request.
registerServerRequestHandler(new ServerRequestHandler() {
@Override
public Response requestReply(Request request) {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
}
});
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
@ -367,6 +432,9 @@ public abstract class RpcClient implements Closeable {
private boolean healthCheck() { private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest(); HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
if (this.currentConnection == null) {
return false;
}
try { try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L); Response response = this.currentConnection.request(healthCheckRequest, 3000L);
//not only check server is ok ,also check connection is register. //not only check server is ok ,also check connection is register.
@ -567,6 +635,7 @@ public abstract class RpcClient implements Closeable {
throw new NacosException(response.getErrorCode(), response.getMessage()); throw new NacosException(response.getErrorCode(), response.getMessage());
} }
// return response. // return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response; return response;
} catch (Exception e) { } catch (Exception e) {
@ -619,6 +688,7 @@ public abstract class RpcClient implements Closeable {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
} }
this.currentConnection.asyncRequest(request, callback); this.currentConnection.asyncRequest(request, callback);
lastActiveTimeStamp = System.currentTimeMillis();
return; return;
} catch (Exception e) { } catch (Exception e) {
if (waitReconnect) { if (waitReconnect) {
@ -668,6 +738,7 @@ public abstract class RpcClient implements Closeable {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
} }
RequestFuture requestFuture = this.currentConnection.requestFuture(request); RequestFuture requestFuture = this.currentConnection.requestFuture(request);
lastActiveTimeStamp = System.currentTimeMillis();
return requestFuture; return requestFuture;
} catch (Exception e) { } catch (Exception e) {
if (waitReconnect) { if (waitReconnect) {
@ -718,6 +789,7 @@ public abstract class RpcClient implements Closeable {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name, LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name,
request.getClass().getSimpleName(), request.getRequestId()); request.getClass().getSimpleName(), request.getRequestId());
lastActiveTimeStamp = System.currentTimeMillis();
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try { try {
Response response = serverRequestHandler.requestReply(request); Response response = serverRequestHandler.requestReply(request);

View File

@ -0,0 +1,60 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.core.remote.control.MonitorKey;
import com.alibaba.nacos.core.remote.control.MonitorKeyParser;
/**
* Parser to get group key of config query request.
*
* @author liuzunfei
* @version $Id: ConfigPublishGroupKeyParser.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ConfigPublishGroupKeyParser extends MonitorKeyParser {
/**
* parse group.
*
* @param args parameters.
* @return
*/
public MonitorKey parse(Object... args) {
if (args != null && args.length != 0 && args[0] instanceof ConfigPublishRequest) {
return new ConfigGroupKey(GroupKey.getKeyTenant(((ConfigPublishRequest) args[0]).getDataId(),
((ConfigPublishRequest) args[0]).getGroup(), ((ConfigPublishRequest) args[0]).getTenant()));
} else {
return null;
}
}
class ConfigGroupKey extends MonitorKey {
public ConfigGroupKey(String key) {
this.setKey(key);
}
@Override
public String getType() {
return "groupKey";
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.core.remote.control.MonitorKey;
import com.alibaba.nacos.core.remote.control.MonitorKeyParser;
/**
* parse to get group from config publish parser.
*
* @author liuzunfei
* @version $Id: ConfigPublishGroupParser.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ConfigPublishGroupParser extends MonitorKeyParser {
/**
* parse group.
*
* @param args parameters.
* @return
*/
public MonitorKey parse(Object... args) {
if (args != null && args.length != 0 && args[0] instanceof ConfigPublishRequest) {
return new ConfigGroupKey(((ConfigPublishRequest) args[0]).getGroup());
} else {
return null;
}
}
class ConfigGroupKey extends MonitorKey {
public ConfigGroupKey(String key) {
this.setKey(key);
}
@Override
public String getType() {
return "group";
}
}
}

View File

@ -59,7 +59,8 @@ public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishReq
} }
@Override @Override
@TpsControl(pointName = "ConfigPublish") @TpsControl(pointName = "ConfigPublish", parsers = {ConfigPublishGroupKeyParser.class,
ConfigPublishGroupParser.class})
@Secured(action = ActionTypes.WRITE, resource = "", parser = ConfigResourceParser.class) @Secured(action = ActionTypes.WRITE, resource = "", parser = ConfigResourceParser.class)
public ConfigPublishResponse handle(ConfigPublishRequest request, RequestMeta meta) throws NacosException { public ConfigPublishResponse handle(ConfigPublishRequest request, RequestMeta meta) throws NacosException {

View File

@ -0,0 +1,60 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.core.remote.control.MonitorKey;
import com.alibaba.nacos.core.remote.control.MonitorKeyParser;
/**
* ConfigQueryGroupKeyParser.
*
* @author liuzunfei
* @version $Id: ConfigQueryGroupKeyParser.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ConfigQueryGroupKeyParser extends MonitorKeyParser {
/**
* parse group key.
*
* @param args parameters.
* @return
*/
public MonitorKey parse(Object... args) {
if (args != null && args.length != 0 && args[0] instanceof ConfigQueryRequest) {
return new ConfigGroupKey(GroupKey.getKeyTenant(((ConfigQueryRequest) args[0]).getDataId(),
((ConfigQueryRequest) args[0]).getGroup(), ((ConfigQueryRequest) args[0]).getTenant()));
} else {
return null;
}
}
class ConfigGroupKey extends MonitorKey {
public ConfigGroupKey(String key) {
this.setKey(key);
}
@Override
public String getType() {
return "groupKey";
}
}
}

View File

@ -0,0 +1,55 @@
package com.alibaba.nacos.config.server.remote;
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.core.remote.control.MonitorKey;
import com.alibaba.nacos.core.remote.control.MonitorKeyParser;
/**
* parse to get group from config query parser.
*
* @author liuzunfei
* @version $Id: ConfigQueryGroupParser.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ConfigQueryGroupParser extends MonitorKeyParser {
/**
* parse group.
*
* @param args parameters.
* @return
*/
public MonitorKey parse(Object... args) {
if (args != null && args.length != 0 && args[0] instanceof ConfigQueryRequest) {
return new ConfigGroupKey(((ConfigQueryRequest) args[0]).getGroup());
} else {
return null;
}
}
class ConfigGroupKey extends MonitorKey {
public ConfigGroupKey(String key) {
this.setKey(key);
}
@Override
public String getType() {
return "group";
}
}
}

View File

@ -73,7 +73,7 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
} }
@Override @Override
@TpsControl(pointName = "ConfigQuery") @TpsControl(pointName = "ConfigQuery", parsers = {ConfigQueryGroupKeyParser.class, ConfigQueryGroupParser.class})
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public ConfigQueryResponse handle(ConfigQueryRequest configQueryRequest, RequestMeta requestMeta) public ConfigQueryResponse handle(ConfigQueryRequest configQueryRequest, RequestMeta requestMeta)
throws NacosException { throws NacosException {

View File

@ -161,18 +161,18 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
@Override @Override
public void run() { public void run() {
tryTimes++; tryTimes++;
if (!tpsMonitorManager.applyTps(clientIp, POINT_CONFIG_PUSH)) { if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
push(this); push(this);
} else { } else {
rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) { rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
@Override @Override
public void onSuccess() { public void onSuccess() {
tpsMonitorManager.applyTps(clientIp, POINT_CONFIG_PUSH_SUCCESS); tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
} }
@Override @Override
public void onFail(Throwable e) { public void onFail(Throwable e) {
tpsMonitorManager.applyTps(clientIp, POINT_CONFIG_PUSH_FAIL); tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
Loggers.REMOTE_PUSH.warn("Push fail", e); Loggers.REMOTE_PUSH.warn("Push fail", e);
push(RpcPushTask.this); push(RpcPushTask.this);
} }

View File

@ -19,9 +19,12 @@ package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor; import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.ClientDetectionRequest;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
@ -54,6 +57,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -68,6 +73,8 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
public static final String RULE_FILE_NAME = "limitRule"; public static final String RULE_FILE_NAME = "limitRule";
private static final long KEEP_ALIVE_TIME = 15000L;
@Autowired @Autowired
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry; private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
@ -212,7 +219,7 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
connectionForClientIp.remove(clientIp); connectionForClientIp.remove(clientIp);
} }
remove.close(); remove.close();
Loggers.REMOTE_DIGEST.info(" connection unregistered successfully,connectionId = {} ", connectionId); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
} }
} }
@ -277,18 +284,28 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
@Override @Override
public void run() { public void run() {
try { try {
MetricsMonitor.getLongConnectionMonitor().set(connections.size());
int totalCount = connections.size();
Loggers.REMOTE_DIGEST.info("Connection check task start");
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
Set<Map.Entry<String, Connection>> entries = connections.entrySet(); Set<Map.Entry<String, Connection>> entries = connections.entrySet();
int currentSdkClientCount = currentSdkClientCount(); int currentSdkClientCount = currentSdkClientCount();
boolean isLoaderClient = loadClient >= 0; boolean isLoaderClient = loadClient >= 0;
int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit; int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
int expelCount = currentMaxClient < 0 ? currentMaxClient : currentSdkClientCount - currentMaxClient; int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);
Loggers.REMOTE_DIGEST
.info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}",
totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),
currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount);
List<String> expelClient = new LinkedList<>(); List<String> expelClient = new LinkedList<>();
Map<String, AtomicInteger> expelForIp = new HashMap<>(16); Map<String, AtomicInteger> expelForIp = new HashMap<>(16);
//1. calculate expel count of ip. //1. calculate expel count of ip.
for (Map.Entry<String, Connection> entry : entries) { for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue(); Connection client = entry.getValue();
String appName = client.getMetaInfo().getAppName(); String appName = client.getMetaInfo().getAppName();
String clientIp = client.getMetaInfo().getClientIp(); String clientIp = client.getMetaInfo().getClientIp();
@ -312,6 +329,15 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
} }
} }
Loggers.REMOTE_DIGEST
.info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size());
if (expelForIp.size() > 0) {
Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp);
}
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
//2.get expel connection for ip limit. //2.get expel connection for ip limit.
for (Map.Entry<String, Connection> entry : entries) { for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue(); Connection client = entry.getValue();
@ -321,6 +347,8 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
integer.decrementAndGet(); integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId()); expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--; expelCount--;
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
} }
} }
@ -333,24 +361,29 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
.isSdkSource() && expelCount > 0) { .isSdkSource() && expelCount > 0) {
expelClient.add(client.getMetaInfo().getConnectionId()); expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--; expelCount--;
outDatedConnections.remove(client.getMetaInfo().getConnectionId());
} }
} }
} }
ConnectResetRequest connectResetRequest = new ConnectResetRequest(); String serverIp = null;
String serverPort = null;
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) { if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON); String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]); serverIp = split[0];
connectResetRequest.setServerPort(split[1]); serverPort = split[1];
} }
for (String expelledClientId : expelClient) { for (String expelledClientId : expelClient) {
try { try {
Connection connection = getConnection(expelledClientId); Connection connection = getConnection(expelledClientId);
if (connection != null) { if (connection != null) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp(serverIp);
connectResetRequest.setServerPort(serverPort);
connection.asyncRequest(connectResetRequest, null); connection.asyncRequest(connectResetRequest, null);
Loggers.REMOTE Loggers.REMOTE_DIGEST
.info("send connection reset server , connection id = {},recommendServerIp={}, recommendServerPort={}", .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}",
expelledClientId, connectResetRequest.getServerIp(), expelledClientId, connectResetRequest.getServerIp(),
connectResetRequest.getServerPort()); connectResetRequest.getServerPort());
} }
@ -358,18 +391,86 @@ public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent
} catch (ConnectionAlreadyClosedException e) { } catch (ConnectionAlreadyClosedException e) {
unregister(expelledClientId); unregister(expelledClientId);
} catch (Exception e) { } catch (Exception e) {
Loggers.REMOTE.error("error occurs when expel connection :", expelledClientId, e); Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e);
}
}
//4.client active detection.
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
Loggers.REMOTE_DIGEST
.info("[{}]send connection active request ", outDateConnectionId);
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.error("[{}]Error occurs when check client active detection ,error={}",
outDateConnectionId, e);
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
Loggers.REMOTE_DIGEST
.info("Out dated connection check successCount={}", successConnections.size());
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
Loggers.REMOTE_DIGEST
.info("[{}]Unregister Out dated connection....", outDateConnectionId);
unregister(outDateConnectionId);
}
} }
} }
//reset loader client //reset loader client
if (isLoaderClient) { if (isLoaderClient) {
loadClient = -1; loadClient = -1;
redirectAddress = null; redirectAddress = null;
} }
Loggers.REMOTE_DIGEST.info("Connection check task end");
} catch (Throwable e) { } catch (Throwable e) {
Loggers.REMOTE.error("error occurs when healthy check... ", e); Loggers.REMOTE.error("Error occurs during connection check... ", e);
} }
} }
}, 1000L, 3000L, TimeUnit.MILLISECONDS); }, 1000L, 3000L, TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,40 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
/**
* MonitorType.
*
* @author liuzunfei
* @version $Id: ClientIpMonitorKey.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ClientIpMonitorKey extends MonitorKey {
public ClientIpMonitorKey() {
}
public ClientIpMonitorKey(String clientIp) {
this.key = clientIp;
}
@Override
public String getType() {
return "clientIp";
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
/**
* ConnectionIdMonitorKey.
*
* @author liuzunfei
* @version $Id: ConnectionIdMonitorKey.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
public class ConnectionIdMonitorKey extends MonitorKey {
String key;
public ConnectionIdMonitorKey() {
}
public ConnectionIdMonitorKey(String clientIp) {
this.key = clientIp;
}
@Override
public String getType() {
return "connectionId";
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
import com.alibaba.nacos.api.common.Constants;
/**
* MonitorType.
*
* @author liuzunfei
* @version $Id: MonitorKey.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class MonitorKey {
String key;
public MonitorKey() {
}
public MonitorKey(String key) {
this.key = key;
}
/**
* get monitor key type.
*
* @return
*/
public abstract String getType();
public String getKey() {
return this.key;
}
public void setKey(String key) {
this.key = key;
}
public String build() {
return this.getType() + Constants.COLON + this.getKey();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.control;
/**
* MonitorType.
*
* @author liuzunfei
* @version $Id: MonitorType.java, v 0.1 2021年01月20日 20:38 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class MonitorKeyParser {
/**
* parse monitor key.
*
* @param arg0 parameters.
* @return
*/
public abstract MonitorKey parse(Object... arg0);
}

View File

@ -37,4 +37,19 @@ public enum MonitorType {
this.desc = desc; this.desc = desc;
} }
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
} }

View File

@ -35,4 +35,11 @@ public @interface TpsControl {
*/ */
String pointName(); String pointName();
/**
* Resource name parser. Should have lower priority than resource().
*
* @return class type of resource parser
*/
Class[] parsers() default {};
} }

View File

@ -26,6 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
/** /**
* tps control point. * tps control point.
@ -52,8 +54,28 @@ public class TpsControlRequestFilter extends AbstractRequestFilter {
if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) { if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
TpsControl tpsControl = method.getAnnotation(TpsControl.class); TpsControl tpsControl = method.getAnnotation(TpsControl.class);
String pointName = tpsControl.pointName(); String pointName = tpsControl.pointName();
boolean pass = tpsMonitorManager.applyTps(meta.getClientIp(), pointName); Class[] parsers = tpsControl.parsers();
List<MonitorKey> monitorKeys = new ArrayList<>();
monitorKeys.add(new ClientIpMonitorKey(meta.getClientIp()));
if (parsers != null) {
for (Class clazz : parsers) {
try {
if (MonitorKeyParser.class.isAssignableFrom(clazz)) {
MonitorKey parseKey = ((MonitorKeyParser) (clazz.newInstance())).parse(request, meta);
if (parseKey != null) {
monitorKeys.add(parseKey);
}
}
} catch (Throwable throwable) {
//ignore
}
}
}
boolean pass = tpsMonitorManager.applyTps(pointName, meta.getConnectionId(), monitorKeys);
if (!pass) { if (!pass) {
Response response = null; Response response = null;
try { try {
@ -62,7 +84,7 @@ public class TpsControlRequestFilter extends AbstractRequestFilter {
return response; return response;
} catch (Exception e) { } catch (Exception e) {
Loggers.TPS_CONTROL_DETAIL Loggers.TPS_CONTROL_DETAIL
.warn("auth fail, request: {},exception:{}", request.getClass().getSimpleName(), e); .warn("Tps monitor fail , request: {},exception:{}", request.getClass().getSimpleName(), e);
return null; return null;
} }

View File

@ -31,7 +31,7 @@ public class TpsControlRule {
private Rule pointRule; private Rule pointRule;
private Map<String, Rule> ipRule = new HashMap<String, Rule>(); private Map<String, Rule> monitorKeyRule = new HashMap<String, Rule>();
public String getPointName() { public String getPointName() {
return pointName; return pointName;
@ -49,12 +49,12 @@ public class TpsControlRule {
this.pointRule = pointRule; this.pointRule = pointRule;
} }
public Map<String, Rule> getIpRule() { public Map<String, Rule> getMonitorKeyRule() {
return ipRule; return monitorKeyRule;
} }
public void setIpRule(Map<String, Rule> ipRule) { public void setMonitorKeyRule(Map<String, Rule> monitorKeyRule) {
this.ipRule = ipRule; this.monitorKeyRule = monitorKeyRule;
} }
public static class Rule { public static class Rule {
@ -99,7 +99,7 @@ public class TpsControlRule {
@Override @Override
public String toString() { public String toString() {
return "TpsControlRule{" + "pointName='" + pointName + '\'' + ", pointRule=" + pointRule + ", ipRule=" + ipRule return "TpsControlRule{" + "pointName='" + pointName + '\'' + ", pointRule=" + pointRule + ", monitorKeyRule="
+ '}'; + monitorKeyRule + '}';
} }
} }

View File

@ -35,6 +35,8 @@ import org.springframework.stereotype.Service;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -129,9 +131,24 @@ public class TpsMonitorManager extends Subscriber<TpsControlRuleChangeEvent> {
* @param pointName pointName. * @param pointName pointName.
* @return pass or not. * @return pass or not.
*/ */
public boolean applyTps(String clientIp, String pointName) { public boolean applyTpsForClientIp(String pointName, String connectionId, String clientIp) {
if (points.containsKey(pointName)) { if (points.containsKey(pointName)) {
return points.get(pointName).applyTps(clientIp);
return points.get(pointName).applyTps(connectionId, Arrays.asList(new ClientIpMonitorKey(clientIp)));
}
return true;
}
/**
* apply tps.
*
* @param pointName pointName.
* @param monitorKeyList monitorKeyList.
* @return pass or not.
*/
public boolean applyTps(String pointName, String connectionId, List<MonitorKey> monitorKeyList) {
if (points.containsKey(pointName)) {
return points.get(pointName).applyTps(connectionId, monitorKeyList);
} }
return true; return true;
} }
@ -199,20 +216,20 @@ public class TpsMonitorManager extends Subscriber<TpsControlRuleChangeEvent> {
stringBuilder.append(point).append("|").append("point|").append(formatString).append("|") stringBuilder.append(point).append("|").append("point|").append(formatString).append("|")
.append(pointSlot.tps.get()).append("|").append(pointSlot.interceptedTps.get()) .append(pointSlot.tps.get()).append("|").append(pointSlot.interceptedTps.get())
.append("\n"); .append("\n");
for (Map.Entry<String, TpsRecorder> entryIp : value.tpsRecordForIp.entrySet()) { for (Map.Entry<String, TpsRecorder> monitorKeyEntry : value.monitorKeysRecorder.entrySet()) {
String clientIp = entryIp.getKey(); String monitorKey = monitorKeyEntry.getKey();
TpsRecorder ipRecord = entryIp.getValue(); TpsRecorder ipRecord = monitorKeyEntry.getValue();
TpsRecorder.TpsSlot slotIp = ipRecord.getPoint(now - 1000L); TpsRecorder.TpsSlot keySlot = ipRecord.getPoint(now - 1000L);
if (slotIp == null) { if (keySlot == null) {
continue; continue;
} }
//already reported. //already reported.
if (lastReportSecond != 0L && lastReportSecond == slotIp.second) { if (lastReportSecond != 0L && lastReportSecond == keySlot.second) {
continue; continue;
} }
stringBuilder.append(point).append("|").append("ip|").append(clientIp).append("|") stringBuilder.append(point).append("|").append(monitorKey).append("|").append(formatString)
.append(formatString).append("|").append(slotIp.tps.get()).append("|") .append("|").append(keySlot.tps.get()).append("|").append(keySlot.interceptedTps.get())
.append(slotIp.interceptedTps.get()).append("\n"); .append("\n");
} }
} }

View File

@ -16,12 +16,15 @@
package com.alibaba.nacos.core.remote.control; package com.alibaba.nacos.core.remote.control;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -40,7 +43,7 @@ public class TpsMonitorPoint {
private TpsRecorder tpsRecorder; private TpsRecorder tpsRecorder;
Map<String, TpsRecorder> tpsRecordForIp = new HashMap<String, TpsRecorder>(); public Map<String, TpsRecorder> monitorKeysRecorder = new HashMap<String, TpsRecorder>();
public TpsMonitorPoint(String pointName) { public TpsMonitorPoint(String pointName) {
this(pointName, -1, "monitor"); this(pointName, -1, "monitor");
@ -80,47 +83,55 @@ public class TpsMonitorPoint {
} }
private void stopAllMonitorClient() { private void stopAllMonitorClient() {
tpsRecordForIp.clear(); monitorKeysRecorder.clear();
} }
/** /**
* increase tps. * increase tps.
* *
* @param clientIp client ip . * @param monitorKey monitorKey.
* @return check current tps is allowed. * @return check current tps is allowed.
*/ */
public boolean applyTps(String clientIp) { public boolean applyTps(String connectionId, List<MonitorKey> monitorKey) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
TpsRecorder.TpsSlot currentTps = tpsRecorder.createPointIfAbsent(now); TpsRecorder.TpsSlot currentTps = tpsRecorder.createPointIfAbsent(now);
//1.check ip tps. //1.check ip tps.
TpsRecorder.TpsSlot currentIpTps = null; List<TpsRecorder.TpsSlot> passedSlots = new ArrayList<>();
if (tpsRecordForIp.containsKey(clientIp)) {
TpsRecorder tpsRecorderIp = tpsRecordForIp.get(clientIp);
currentIpTps = tpsRecorderIp.createPointIfAbsent(now); if (CollectionUtils.isNotEmpty(monitorKey)) {
long maxTpsOfIp = tpsRecorderIp.getMaxTps(); for (MonitorKey monitorKey0 : monitorKey) {
boolean overLimit = maxTpsOfIp >= 0 && currentIpTps.tps.longValue() >= maxTpsOfIp; if (monitorKeysRecorder.containsKey(monitorKey0.build())) {
TpsRecorder tpsRecorderKey = monitorKeysRecorder.get(monitorKey0.build());
TpsRecorder.TpsSlot currentKeySlot = tpsRecorderKey.createPointIfAbsent(now);
long maxTpsOfIp = tpsRecorderKey.getMaxTps();
boolean overLimit = maxTpsOfIp >= 0 && currentKeySlot.tps.longValue() >= maxTpsOfIp;
if (overLimit) { if (overLimit) {
Loggers.TPS_CONTROL_DETAIL Loggers.TPS_CONTROL_DETAIL
.info("tps over limit ,pointName=[{}],clientIp=[{}],barrier=[{}]monitorType={}", .info("[{}]Tps over limit ,pointName=[{}],barrier=[{}]monitorModel={}", connectionId,
this.getPointName(), clientIp, "ipRule", tpsRecorderIp.getMonitorType()); this.getPointName(), monitorKey0.getType(), tpsRecorderKey.getMonitorType());
if (tpsRecorderIp.isInterceptMode()) { if (tpsRecorderKey.isInterceptMode()) {
currentIpTps.interceptedTps.incrementAndGet(); currentKeySlot.interceptedTps.incrementAndGet();
currentTps.interceptedTps.incrementAndGet(); currentTps.interceptedTps.incrementAndGet();
return false; return false;
} }
} else {
passedSlots.add(currentKeySlot);
} }
} }
}
}
//2.check total tps. //2.check total tps.
long maxTps = tpsRecorder.getMaxTps(); long maxTps = tpsRecorder.getMaxTps();
boolean overLimit = maxTps >= 0 && currentTps.tps.longValue() >= maxTps; boolean overLimit = maxTps >= 0 && currentTps.tps.longValue() >= maxTps;
if (overLimit) { if (overLimit) {
Loggers.TPS_CONTROL_DETAIL.info("tps over limit ,pointName=[{}],clientIp=[{}],barrier=[{}]monitorType={}", Loggers.TPS_CONTROL_DETAIL
this.getPointName(), clientIp, "pointRule", tpsRecorder.getMonitorType()); .info("[{}]Tps over limit ,pointName=[{}],barrier=[{}]monitorType={}", connectionId,
this.getPointName(), "pointRule", tpsRecorder.getMonitorType());
if (tpsRecorder.isInterceptMode()) { if (tpsRecorder.isInterceptMode()) {
currentTps.interceptedTps.incrementAndGet(); currentTps.interceptedTps.incrementAndGet();
return false; return false;
@ -128,8 +139,8 @@ public class TpsMonitorPoint {
} }
currentTps.tps.incrementAndGet(); currentTps.tps.incrementAndGet();
if (currentIpTps != null) { for (TpsRecorder.TpsSlot passedTpsSlot : passedSlots) {
currentIpTps.tps.incrementAndGet(); passedTpsSlot.tps.incrementAndGet();
} }
//3.check pass. //3.check pass.
return true; return true;
@ -180,38 +191,39 @@ public class TpsMonitorPoint {
} }
//3.check rule for ips. //3.check rule for ips.
Map<String, TpsControlRule.Rule> ipRule = controlRule.getIpRule(); Map<String, TpsControlRule.Rule> monitorKeyRules = controlRule.getMonitorKeyRule();
if (controlRule.getIpRule() == null || ipRule.isEmpty()) { if (monitorKeyRules == null || monitorKeyRules.isEmpty()) {
Loggers.TPS_CONTROL.info("Clear point control rule for client ips, pointName=[{}] ", this.getPointName()); Loggers.TPS_CONTROL
.info("Clear point control rule for monitorKeys, pointName=[{}] ", this.getPointName());
this.stopAllMonitorClient(); this.stopAllMonitorClient();
} else { } else {
Map<String, TpsRecorder> tpsRecordForIp = this.tpsRecordForIp; Map<String, TpsRecorder> tpsRecordForIp = this.monitorKeysRecorder;
for (Map.Entry<String, TpsControlRule.Rule> clientIpRule : ipRule.entrySet()) { for (Map.Entry<String, TpsControlRule.Rule> monitorRule : monitorKeyRules.entrySet()) {
if (clientIpRule.getValue() == null) { if (monitorRule.getValue() == null) {
continue; continue;
} }
//update rule. //update rule.
if (tpsRecordForIp.containsKey(clientIpRule.getKey())) { if (tpsRecordForIp.containsKey(monitorRule.getKey())) {
TpsRecorder tpsRecorder = tpsRecordForIp.get(clientIpRule.getKey()); TpsRecorder tpsRecorder = tpsRecordForIp.get(monitorRule.getKey());
Loggers.TPS_CONTROL Loggers.TPS_CONTROL
.info("Update point control rule for client ip ,pointName=[{}],client ip=[{}],original maxTps={}" .info("Update point control rule for client ip ,pointName=[{}],monitorKey=[{}],original maxTps={}"
+ ", new maxTps={},original monitorType={}, new monitorType={}, ", + ", new maxTps={},original monitorType={}, new monitorType={}, ",
this.getPointName(), clientIpRule.getKey(), tpsRecorder.getMaxTps(), this.getPointName(), monitorRule.getKey(), tpsRecorder.getMaxTps(),
clientIpRule.getValue().maxTps, tpsRecorder.getMonitorType(), monitorRule.getValue().maxTps, tpsRecorder.getMonitorType(),
clientIpRule.getValue().monitorType); monitorRule.getValue().monitorType);
tpsRecorder.setMaxTps(clientIpRule.getValue().maxTps); tpsRecorder.setMaxTps(monitorRule.getValue().maxTps);
tpsRecorder.setMonitorType(clientIpRule.getValue().monitorType); tpsRecorder.setMonitorType(monitorRule.getValue().monitorType);
} else { } else {
Loggers.TPS_CONTROL Loggers.TPS_CONTROL
.info("Add point control rule for client ip ,pointName=[{}],client ip=[{}], new maxTps={}, new monitorType={}, ", .info("Add point control rule for client ip ,pointName=[{}],monitorKey=[{}], new maxTps={}, new monitorType={}, ",
this.getPointName(), clientIpRule.getKey(), clientIpRule.getValue().maxTps, this.getPointName(), monitorRule.getKey(), monitorRule.getValue().maxTps,
clientIpRule.getValue().monitorType); monitorRule.getValue().monitorType);
// add rule // add rule
TpsRecorder tpsRecorderAdd = new TpsRecorder(startTime, DEFAULT_RECORD_SIZE); TpsRecorder tpsRecorderAdd = new TpsRecorder(startTime, DEFAULT_RECORD_SIZE);
tpsRecorderAdd.setMaxTps(clientIpRule.getValue().maxTps); tpsRecorderAdd.setMaxTps(monitorRule.getValue().maxTps);
tpsRecorderAdd.setMonitorType(clientIpRule.getValue().monitorType); tpsRecorderAdd.setMonitorType(monitorRule.getValue().monitorType);
tpsRecordForIp.put(clientIpRule.getKey(), tpsRecorderAdd); tpsRecordForIp.put(monitorRule.getKey(), tpsRecorderAdd);
} }
} }
@ -220,8 +232,8 @@ public class TpsMonitorPoint {
Iterator<Map.Entry<String, TpsRecorder>> iteratorCurrent = tpsRecordForIp.entrySet().iterator(); Iterator<Map.Entry<String, TpsRecorder>> iteratorCurrent = tpsRecordForIp.entrySet().iterator();
while (iteratorCurrent.hasNext()) { while (iteratorCurrent.hasNext()) {
Map.Entry<String, TpsRecorder> next1 = iteratorCurrent.next(); Map.Entry<String, TpsRecorder> next1 = iteratorCurrent.next();
if (!ipRule.containsKey(next1.getKey())) { if (!monitorKeyRules.containsKey(next1.getKey())) {
Loggers.TPS_CONTROL.info("Delete point control rule for client ip ,pointName=[{}],client ip=[{}]", Loggers.TPS_CONTROL.info("Delete point control rule for pointName=[{}] ,monitorKey=[{}]",
this.getPointName(), next1.getKey()); this.getPointName(), next1.getKey());
iteratorCurrent.remove(); iteratorCurrent.remove();
} }

View File

@ -122,7 +122,6 @@ public class GrpcConnection extends Connection {
String connectionId = null; String connectionId = null;
try { try {
if (isConnected()) {
connectionId = getMetaInfo().getConnectionId(); connectionId = getMetaInfo().getConnectionId();
if (isTraced()) { if (isTraced()) {
@ -131,7 +130,6 @@ public class GrpcConnection extends Connection {
closeBiStream(); closeBiStream();
channel.close(); channel.close();
}
} catch (Exception e) { } catch (Exception e) {
Loggers.REMOTE_DIGEST.warn("[{}] connection close exception : {}", connectionId, e); Loggers.REMOTE_DIGEST.warn("[{}] connection close exception : {}", connectionId, e);

View File

@ -16,12 +16,19 @@
package com.alibaba.nacos.test.remote; package com.alibaba.nacos.test.remote;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.control.ClientIpMonitorKey;
import com.alibaba.nacos.core.remote.control.MonitorType;
import com.alibaba.nacos.core.remote.control.TpsControlRule;
import com.alibaba.nacos.core.remote.control.TpsMonitorManager; import com.alibaba.nacos.core.remote.control.TpsMonitorManager;
import com.alibaba.nacos.core.remote.control.TpsMonitorPoint; import com.alibaba.nacos.core.remote.control.TpsMonitorPoint;
import com.alibaba.nacos.core.remote.control.TpsRecorder;
import org.apache.commons.collections.map.HashedMap;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
public class TpsMonitorManagerTest { public class TpsMonitorManagerTest {
@ -36,17 +43,44 @@ public class TpsMonitorManagerTest {
testPoints = Arrays testPoints = Arrays
.asList("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"); .asList("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10");
for (String point : testPoints) { for (String point : testPoints) {
tpsMonitorManager.registerTpsControlPoint(new TpsMonitorPoint(point)); TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(point);
TpsControlRule tpsControlRule = new TpsControlRule();
Map<String, TpsControlRule.Rule> monitorKeyRule = new HashedMap();
monitorKeyRule.put(new ClientIpMonitorKey("1").build(),
new TpsControlRule.Rule(10, MonitorType.INTERCEPT.getType()));
monitorKeyRule.put(new ClientIpMonitorKey("5").build(),
new TpsControlRule.Rule(10, MonitorType.INTERCEPT.getType()));
tpsControlRule.setMonitorKeyRule(monitorKeyRule);
tpsControlRule.setPointRule(new TpsControlRule.Rule(100,MonitorType.INTERCEPT.getType()));
tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
System.out.println(JacksonUtils.toJson(tpsControlRule));
tpsMonitorPoint.applyRule(tpsControlRule);
} }
} }
@Test
public void printlnJson() {
TpsControlRule tpsControlRule = new TpsControlRule();
Map<String, TpsControlRule.Rule> monitorKeyRule = new HashedMap();
monitorKeyRule.put(new ClientIpMonitorKey("1").build(),
new TpsControlRule.Rule(10, MonitorType.INTERCEPT.getType()));
monitorKeyRule.put(new ClientIpMonitorKey("5").build(),
new TpsControlRule.Rule(10, MonitorType.INTERCEPT.getType()));
tpsControlRule.setMonitorKeyRule(monitorKeyRule);
tpsControlRule.setPointRule(new TpsControlRule.Rule(100,MonitorType.INTERCEPT.getType()));
System.out.println(JacksonUtils.toJson(tpsControlRule));
}
@Test @Test
public void runTest() { public void runTest() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Random random = new Random(); Random random = new Random();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
tpsMonitorManager.applyTps("", testPoints.get(random.nextInt(testPoints.size()))); tpsMonitorManager.applyTps(testPoints.get(random.nextInt(testPoints.size())), "connectionId1",
Arrays.asList(new ClientIpMonitorKey(""+new Random().nextInt(10))));
//tpsMonitorManager.applyTpsForClientIp(testPoints.get(random.nextInt(testPoints.size())), "", "");
try { try {
Thread.sleep(1L); Thread.sleep(1L);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -56,5 +90,10 @@ public class TpsMonitorManagerTest {
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
System.out.println("Time costs:" + (end - start)); System.out.println("Time costs:" + (end - start));
System.out.println(tpsMonitorManager.points.get("test1").getTpsRecorder().getSlotList()); System.out.println(tpsMonitorManager.points.get("test1").getTpsRecorder().getSlotList());
for (Map.Entry<String, TpsRecorder> entry : tpsMonitorManager.points.get("test1").monitorKeysRecorder.entrySet()) {
System.out.println("Monitor Key:" + entry.getKey());
System.out.println(entry.getValue().getSlotList());
}
} }
} }