负载均衡bugfix;监听并发问题bugfix;direct memory泄漏bugfix (#4365)

* memory gc optimize ; string pool bugfix

* grpc executor

* grpc executor

* 负载均衡bugfix;监听并发问题bugfix;direct memory泄漏bugfix

* check style fix
This commit is contained in:
nov.lzf 2020-11-30 16:17:34 +08:00 committed by GitHub
parent c1e4068dfe
commit ad98020770
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 378 additions and 237 deletions

View File

@ -81,8 +81,10 @@ public class DefaultRequestFuture implements RequestFuture {
this.requestCallBack = requestCallBack; this.requestCallBack = requestCallBack;
this.requestId = requestId; this.requestId = requestId;
this.connectionId = connectionId; this.connectionId = connectionId;
this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER if (requestCallBack != null) {
.schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS); this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER
.schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS);
}
this.timeoutInnerTrigger = timeoutInnerTrigger; this.timeoutInnerTrigger = timeoutInnerTrigger;
} }

View File

@ -83,4 +83,11 @@ public interface Requester {
* close connection. * close connection.
*/ */
public void close(); public void close();
/**
* check this requester is busy.
*
* @return
*/
public boolean isBusy();
} }

View File

@ -40,6 +40,11 @@ public abstract class Connection implements Requester {
this.serverInfo = serverInfo; this.serverInfo = serverInfo;
} }
@Override
public boolean isBusy() {
return false;
}
/** /**
* Getter method for property <tt>abandon</tt>. * Getter method for property <tt>abandon</tt>.
* *

View File

@ -78,8 +78,7 @@ public class GrpcConnection extends Connection {
try { try {
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS); grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); throw new NacosException(NacosException.SERVER_ERROR, e);
return null;
} }
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody(); Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
@ -101,13 +100,7 @@ public class GrpcConnection extends Connection {
@Override @Override
public Response get() throws InterruptedException, ExecutionException { public Response get() throws InterruptedException, ExecutionException {
Payload grpcResponse = null; Payload grpcResponse = null;
try { grpcResponse = requestFuture.get();
grpcResponse = requestFuture.get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
Response response = (Response) GrpcUtils.parse(grpcResponse).getBody(); Response response = (Response) GrpcUtils.parse(grpcResponse).getBody();
return response; return response;
} }
@ -143,7 +136,7 @@ public class GrpcConnection extends Connection {
throws NacosException { throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta); Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest); ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
//set callback . //set callback .
Futures.addCallback(requestFuture, new FutureCallback<Payload>() { Futures.addCallback(requestFuture, new FutureCallback<Payload>() {
@Override @Override
@ -177,6 +170,10 @@ public class GrpcConnection extends Connection {
@Override @Override
public void close() { public void close() {
if (this.payloadStreamObserver != null) {
payloadStreamObserver.onCompleted();
}
if (this.channel != null && !channel.isShutdown()) { if (this.channel != null && !channel.isShutdown()) {
this.channel.shutdownNow(); this.channel.shutdownNow();
} }

View File

@ -26,6 +26,10 @@ public class ConnectionAlreadyClosedException extends RemoteException {
private static final int CONNECTION_ALREADY_CLOSED = 600; private static final int CONNECTION_ALREADY_CLOSED = 600;
public ConnectionAlreadyClosedException(String msg) {
super(CONNECTION_ALREADY_CLOSED);
}
public ConnectionAlreadyClosedException() { public ConnectionAlreadyClosedException() {
super(CONNECTION_ALREADY_CLOSED); super(CONNECTION_ALREADY_CLOSED);
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.remote.exception;
/**
* connection is busy exception.
*
* @author liuzunfei
* @version $Id: ConnectionBusyException.java, v 0.1 2020年11月30日 7:28 PM liuzunfei Exp $
*/
public class ConnectionBusyException extends RemoteException {
private static final int CONNECTION_BUSY = 601;
public ConnectionBusyException(String msg) {
super(CONNECTION_BUSY, msg);
}
public ConnectionBusyException(Throwable throwable) {
super(CONNECTION_BUSY, throwable);
}
}

View File

@ -30,6 +30,10 @@ public class RemoteException extends NacosRuntimeException {
super(errorCode); super(errorCode);
} }
public RemoteException(int errorCode, String msg) {
super(errorCode, msg);
}
public RemoteException(int errorCode, Throwable throwable) { public RemoteException(int errorCode, Throwable throwable) {
super(errorCode, throwable); super(errorCode, throwable);
} }

View File

@ -17,11 +17,12 @@
package com.alibaba.nacos.config.server.remote; package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.MapUtil;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -36,28 +37,27 @@ import java.util.concurrent.ConcurrentHashMap;
public class ConfigChangeListenContext { public class ConfigChangeListenContext {
/** /**
* groupKey-> connnection set. * groupKey-> connection set.
*/ */
private final Map<String, Set<String>> groupKeyContext = new ConcurrentHashMap<String, Set<String>>(128); private ConcurrentHashMap<String, HashSet<String>> groupKeyContext = new ConcurrentHashMap<String, HashSet<String>>();
/** /**
* connectionId-> groupkey set. * connectionId-> group key set.
*/ */
private final Map<String, HashMap<String, String>> connectionIdContext = new ConcurrentHashMap<String, HashMap<String, String>>(128); private ConcurrentHashMap<String, HashMap<String, String>> connectionIdContext = new ConcurrentHashMap<String, HashMap<String, String>>();
/** /**
* add listen . * add listen.
* *
* @param listenKey listenKey. * @param groupKey groupKey.
* @param connectionId connectionId. * @param connectionId connectionId.
*/ */
public void addListen(String listenKey, String md5, String connectionId) { public synchronized void addListen(String groupKey, String md5, String connectionId) {
// 1.add groupKeyContext // 1.add groupKeyContext
Set<String> listenClients = groupKeyContext.get(listenKey); Set<String> listenClients = groupKeyContext.get(groupKey);
if (listenClients == null) { if (listenClients == null) {
groupKeyContext.putIfAbsent(listenKey, new HashSet<String>()); groupKeyContext.putIfAbsent(groupKey, new HashSet<String>());
listenClients = groupKeyContext.get(listenKey); listenClients = groupKeyContext.get(groupKey);
} }
listenClients.add(connectionId); listenClients.add(connectionId);
@ -67,80 +67,109 @@ public class ConfigChangeListenContext {
connectionIdContext.putIfAbsent(connectionId, new HashMap<String, String>(16)); connectionIdContext.putIfAbsent(connectionId, new HashMap<String, String>(16));
groupKeys = connectionIdContext.get(connectionId); groupKeys = connectionIdContext.get(connectionId);
} }
groupKeys.put(listenKey, md5); groupKeys.put(groupKey, md5);
} }
/** /**
* remove listen context for connection id . * remove listen context for connection id .
* *
* @param listenKey listenKey. * @param groupKey groupKey.
* @param connectionId connection id. * @param connectionId connection id.
*/ */
public void removeListen(String listenKey, String connectionId) { public synchronized void removeListen(String groupKey, String connectionId) {
//1. remove groupKeyContext //1. remove groupKeyContext
Set<String> connectionIds = groupKeyContext.get(listenKey); Set<String> connectionIds = groupKeyContext.get(groupKey);
if (connectionIds != null) { if (connectionIds != null) {
connectionIds.remove(connectionId); connectionIds.remove(connectionId);
if (connectionIds.isEmpty()) { if (connectionIds.isEmpty()) {
MapUtil.removeKey(groupKeyContext, listenKey, CollectionUtils::isEmpty); groupKeyContext.remove(groupKey);
} }
} }
//2.remove connectionIdContext //2.remove connectionIdContext
HashMap<String, String> groupKeys = connectionIdContext.get(connectionId); HashMap<String, String> groupKeys = connectionIdContext.get(connectionId);
if (groupKeys != null) { if (groupKeys != null) {
groupKeys.remove(listenKey); groupKeys.remove(groupKey);
} }
} }
public Set<String> getListeners(String listenKey) { /**
return groupKeyContext.get(listenKey); * get listeners of the group key.
*
* @param groupKey groupKey.
* @return the copy of listeners, may be return null.
*/
public synchronized Set<String> getListeners(String groupKey) {
HashSet<String> strings = groupKeyContext.get(groupKey);
if (CollectionUtils.isNotEmpty(strings)) {
Set<String> listenConnections = new HashSet<String>();
safeCopy(strings, listenConnections);
return listenConnections;
}
return null;
} }
/** /**
* remove the context related to the connectionid. * copy collections.
*
* @param src may be modified concurrently
* @param dest dest collection
*/
private void safeCopy(Collection src, Collection dest) {
Iterator iterator = src.iterator();
while (iterator.hasNext()) {
dest.add(iterator.next());
}
}
/**
* remove the context related to the connection id.
* *
* @param connectionId connectionId. * @param connectionId connectionId.
*/ */
public void clearContextForConnectionId(final String connectionId) { public synchronized void clearContextForConnectionId(final String connectionId) {
Map<String, String> listenKeys = getListenKeys(connectionId); Map<String, String> listenKeys = getListenKeys(connectionId);
if (listenKeys != null) { if (listenKeys != null) {
for (Map.Entry<String, String> groupKey : listenKeys.entrySet()) { for (Map.Entry<String, String> groupKey : listenKeys.entrySet()) {
Set<String> connectionIds = groupKeyContext.get(groupKey.getKey()); Set<String> connectionIds = groupKeyContext.get(groupKey.getKey());
if (CollectionUtils.isNotEmpty(connectionIds)) { if (CollectionUtils.isNotEmpty(connectionIds)) {
connectionIds.remove(connectionId); connectionIds.remove(connectionId);
} else { } else {
MapUtil.removeKey(groupKeyContext, groupKey.getKey(), CollectionUtils::isEmpty); groupKeyContext.remove(groupKey.getKey());
} }
} }
} }
MapUtil.removeKey(connectionIdContext, connectionId, MapUtil::isEmpty);
connectionIdContext.remove(connectionId);
} }
/** /**
* get listenkeys. * get listen keys.
* *
* @param connectionId connetionid. * @param connectionId connection id.
* @return * @return listen group keys of the connection id, key:group key,value:md5
*/ */
public Map<String, String> getListenKeys(String connectionId) { public synchronized Map<String, String> getListenKeys(String connectionId) {
return connectionIdContext.get(connectionId); Map<String, String> copy = new HashMap<String, String>(connectionIdContext.get(connectionId));
return copy;
} }
/** /**
* get listenkey. * get md5.
* *
* @param connectionId connetionid. * @param connectionId connection id.
* @return * @return md5 of the listen group key.
*/ */
public String getListenKeyMd5(String connectionId, String groupKey) { public String getListenKeyMd5(String connectionId, String groupKey) {
Map<String, String> groupKeyContexts = connectionIdContext.get(connectionId); Map<String, String> groupKeyContexts = connectionIdContext.get(connectionId);
return groupKeyContexts == null ? null : groupKeyContexts.get(groupKey); return groupKeyContexts == null ? null : groupKeyContexts.get(groupKey);
} }
} }

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.remote.response.AbstractPushCallBack;
import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent; import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.utils.ConfigExecutor; import com.alibaba.nacos.config.server.utils.ConfigExecutor;
@ -30,9 +31,9 @@ import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RpcPushService; import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -46,55 +47,52 @@ import java.util.concurrent.TimeUnit;
@Component(value = "rpcConfigChangeNotifier") @Component(value = "rpcConfigChangeNotifier")
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> { public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
final ConfigChangeListenContext configChangeListenContext; public RpcConfigChangeNotifier() {
private final RpcPushService rpcPushService;
private final ConnectionManager connectionManager;
public RpcConfigChangeNotifier(ConfigChangeListenContext configChangeListenContext, RpcPushService rpcPushService,
ConnectionManager connectionManager) {
NotifyCenter.registerSubscriber(this); NotifyCenter.registerSubscriber(this);
this.configChangeListenContext = configChangeListenContext;
this.rpcPushService = rpcPushService;
this.connectionManager = connectionManager;
} }
@Autowired
ConfigChangeListenContext configChangeListenContext;
@Autowired
private RpcPushService rpcPushService;
@Autowired
private ConnectionManager connectionManager;
/** /**
* adaptor to config module ,when server side config change ,invoke this method. * adaptor to config module ,when server side config change ,invoke this method.
* *
* @param groupKey groupKey * @param groupKey groupKey
* @param notifyRequest notifyRequest * @param notifyRequet notifyRequet
*/ */
public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequest) { public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequet) {
Set<String> listeners = configChangeListenContext.getListeners(groupKey); Set<String> listeners = configChangeListenContext.getListeners(groupKey);
if (listeners == null || listeners.isEmpty()) { if (CollectionUtils.isEmpty(listeners)) {
return; return;
} }
Set<String> clients = new HashSet<>(listeners);
int notifyCount = 0; int notifyCount = 0;
if (!CollectionUtils.isEmpty(clients)) { for (final String client : listeners) {
for (final String client : clients) { Connection connection = connectionManager.getConnection(client);
Connection connection = connectionManager.getConnection(client); if (connection == null) {
if (connection == null) { continue;
}
if (notifyRequet.isBeta()) {
List<String> betaIps = notifyRequet.getBetaIps();
if (betaIps != null && !betaIps.contains(connection.getMetaInfo().getClientIp())) {
continue; continue;
} }
if (notifyRequest.isBeta()) {
List<String> betaIps = notifyRequest.getBetaIps();
if (betaIps != null && !betaIps.contains(connection.getMetaInfo().getClientIp())) {
continue;
}
}
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client,
connection.getMetaInfo().getClientIp(), connection.getMetaInfo().getConnectionId());
push(rpcPushRetryTask);
notifyCount++;
} }
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 50, client,
connection.getMetaInfo().getClientIp(), connection.getMetaInfo().getConnectionId());
push(rpcPushRetryTask);
notifyCount++;
} }
Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyCount, groupKey); Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyCount, groupKey);
} }
@ -104,10 +102,10 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
boolean isBeta = event.isBeta; boolean isBeta = event.isBeta;
List<String> betaIps = event.betaIps; List<String> betaIps = event.betaIps;
String[] strings = GroupKey.parseKey(groupKey); String[] strings = GroupKey.parseKey(groupKey);
String dataId = strings[0]; String dataid = strings[0];
String group = strings[1]; String group = strings[1];
String tenant = strings.length > 2 ? strings[2] : ""; String tenant = strings.length > 2 ? strings[2] : "";
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant); ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataid, group, tenant);
notifyRequest.setBeta(isBeta); notifyRequest.setBeta(isBeta);
notifyRequest.setBetaIps(betaIps); notifyRequest.setBetaIps(betaIps);
if (PropertyUtil.isPushContent()) { if (PropertyUtil.isPushContent()) {
@ -128,25 +126,25 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
class RpcPushTask implements Runnable { class RpcPushTask implements Runnable {
ConfigChangeNotifyRequest notifyRequest; ConfigChangeNotifyRequest notifyRequet;
int maxRetryTimes = -1; int maxRetryTimes = -1;
int tryTimes = 0; int tryTimes = 0;
String clientId; String clientId;
String clientIp; String clientIp;
String appName; String appName;
public RpcPushTask(ConfigChangeNotifyRequest notifyRequet, String clientId, String clientIp, String appName) { public RpcPushTask(ConfigChangeNotifyRequest notifyRequet, String clientId, String clientIp, String appName) {
this(notifyRequet, -1, clientId, clientIp, appName); this(notifyRequet, -1, clientId, clientIp, appName);
} }
public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String clientId, String clientIp, public RpcPushTask(ConfigChangeNotifyRequest notifyRequet, int maxRetryTimes, String clientId, String clientIp,
String appName) { String appName) {
this.notifyRequest = notifyRequest; this.notifyRequet = notifyRequet;
this.maxRetryTimes = maxRetryTimes; this.maxRetryTimes = maxRetryTimes;
this.clientId = clientId; this.clientId = clientId;
this.clientIp = clientIp; this.clientIp = clientIp;
@ -159,19 +157,22 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
@Override @Override
public void run() { public void run() {
rpcPushService.pushWithCallback(clientId, notifyRequest, new AbstractPushCallBack(3000L) { rpcPushService.pushWithCallback(clientId, notifyRequet, new AbstractPushCallBack(3000L) {
int retryTimes = tryTimes; int retryTimes = tryTimes;
@Override @Override
public void onSuccess() { public void onSuccess() {
} }
@Override @Override
public void onFail(Throwable e) { public void onFail(Throwable e) {
if (e instanceof ConnectionAlreadyClosedException) {
Loggers.CORE.warn(e.getMessage());
}
push(RpcPushTask.this); push(RpcPushTask.this);
} }
}, ConfigExecutor.getClientConfigNotifierServiceExecutor()); }, ConfigExecutor.getClientConfigNotifierServiceExecutor());
tryTimes++; tryTimes++;
@ -179,7 +180,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
} }
private void push(RpcPushTask retryTask) { private void push(RpcPushTask retryTask) {
ConfigChangeNotifyRequest notifyRequet = retryTask.notifyRequest; ConfigChangeNotifyRequest notifyRequet = retryTask.notifyRequet;
if (retryTask.isOverTimes()) { if (retryTask.isOverTimes()) {
Loggers.CORE Loggers.CORE
.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.", .warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.",

View File

@ -399,7 +399,7 @@ public class LongPollingService {
try { try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations. // Delete subscriber's relations.
allSubs.remove(ClientLongPolling.this); allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) { if (isFixedPolling()) {

View File

@ -35,6 +35,7 @@ import com.alibaba.nacos.core.cluster.remote.ClusterRpcClientProxy;
import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler; import com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler;
import com.alibaba.nacos.core.remote.core.ServerReloaderRequestHandler;
import com.alibaba.nacos.core.utils.RemoteUtils; import com.alibaba.nacos.core.utils.RemoteUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -77,6 +78,9 @@ public class ServerLoaderController {
@Autowired @Autowired
private ClusterRpcClientProxy clusterRpcClientProxy; private ClusterRpcClientProxy clusterRpcClientProxy;
@Autowired
private ServerReloaderRequestHandler serverReloaderRequestHandler;
@Autowired @Autowired
private ServerLoaderInfoRequestHandler serverLoaderInfoRequestHandler; private ServerLoaderInfoRequestHandler serverLoaderInfoRequestHandler;
@ -184,15 +188,9 @@ public class ServerLoaderController {
} }
} }
List<ServerLoaderMetris> responseList = new LinkedList<ServerLoaderMetris>();
try { try {
ServerLoaderInfoResponse handle = serverLoaderInfoRequestHandler serverReloaderRequestHandler.handle(serverLoaderInfoRequest, new RequestMeta());
.handle(new ServerLoaderInfoRequest(), new RequestMeta());
ServerLoaderMetris metris = new ServerLoaderMetris();
metris.setAddress(serverMemberManager.getSelf().getAddress());
metris.setMetric(handle.getLoaderMetrics());
responseList.add(metris);
} catch (NacosException e) { } catch (NacosException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -206,9 +204,9 @@ public class ServerLoaderController {
@Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.READ) @Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.READ)
@GetMapping("/clustermetric") @GetMapping("/clustermetric")
public ResponseEntity clusterLoader() { public ResponseEntity clusterLoader() {
Map<String, Object> serverLoadMetrics = getServerLoadMetrics(); Map<String, Object> serverLoadMetrics = getServerLoadMetrics();
return ResponseEntity.ok().body(serverLoadMetrics); return ResponseEntity.ok().body(serverLoadMetrics);
} }

View File

@ -57,6 +57,11 @@ public abstract class Connection implements Requester {
return metaInfo; return metaInfo;
} }
@Override
public boolean isBusy() {
return false;
}
@Override @Override
public String toString() { public String toString() {
return ToStringBuilder.reflectionToString(this); return ToStringBuilder.reflectionToString(this);

View File

@ -80,14 +80,17 @@ public class ConnectionManager {
* @param connectionId connectionId * @param connectionId connectionId
* @param connection connection * @param connection connection
*/ */
public void register(String connectionId, Connection connection) { public synchronized void register(String connectionId, Connection connection) {
Connection connectionInner = connetions.put(connectionId, connection); if (connection.isConnected()) {
if (connectionInner == null) { Connection connectionInner = connetions.put(connectionId, connection);
clientConnectionEventListenerRegistry.notifyClientConnected(connection); if (connectionInner == null) {
Loggers.REMOTE clientConnectionEventListenerRegistry.notifyClientConnected(connection);
.info("new connection registered successfully, connectionid = {},connection={} ", connectionId, Loggers.REMOTE
connection); .info("new connection registered successfully, connectionid = {},connection={} ", connectionId,
connection);
}
} }
} }
/** /**
@ -95,7 +98,7 @@ public class ConnectionManager {
* *
* @param connectionId connectionId. * @param connectionId connectionId.
*/ */
public void unregister(String connectionId) { public synchronized void unregister(String connectionId) {
Connection remove = this.connetions.remove(connectionId); Connection remove = this.connetions.remove(connectionId);
if (remove != null) { if (remove != null) {
remove.close(); remove.close();
@ -164,12 +167,9 @@ public class ConnectionManager {
@Override @Override
public void run() { public void run() {
try { try {
Loggers.REMOTE.info("rpc ack size :{}", RpcAckCallbackSynchronizer.CALLBACK_CONTEXT.size());
;
MetricsMonitor.getLongConnectionMonitor().set(connetions.size()); MetricsMonitor.getLongConnectionMonitor().set(connetions.size());
long currentStamp = System.currentTimeMillis(); long currentStamp = System.currentTimeMillis();
Set<Map.Entry<String, Connection>> entries = connetions.entrySet(); Set<Map.Entry<String, Connection>> entries = connetions.entrySet();
boolean isLoaderClient = loadClient >= 0; boolean isLoaderClient = loadClient >= 0;
@ -183,19 +183,19 @@ public class ConnectionManager {
expelCount--; expelCount--;
} }
} }
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]);
}
for (String expeledClientId : expelClient) { for (String expeledClientId : expelClient) {
try { try {
Connection connection = getConnection(expeledClientId); Connection connection = getConnection(expeledClientId);
if (connection != null) { if (connection != null) {
connection.asyncRequest(connectResetRequest, buildMeta(), null);
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(":")) {
String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]);
}
connection.request(connectResetRequest, buildMeta());
Loggers.REMOTE Loggers.REMOTE
.info("expel connection ,send switch server response connectionid = {},connectResetRequest={} ", .info("expel connection ,send switch server response connectionid = {},connectResetRequest={} ",
expeledClientId, connectResetRequest); expeledClientId, connectResetRequest);
@ -214,7 +214,7 @@ public class ConnectionManager {
redirectAddress = null; redirectAddress = null;
} }
} catch (Exception e) { } catch (Throwable e) {
Loggers.REMOTE.error("error occurs when heathy check... ", e); Loggers.REMOTE.error("error occurs when heathy check... ", e);
} }
} }

View File

@ -52,9 +52,7 @@ public class ServerReloaderRequestHandler extends RequestHandler<ServerReloadReq
if (sdkCount <= reloadCount) { if (sdkCount <= reloadCount) {
response.setMessage("ignore"); response.setMessage("ignore");
} else { } else {
if (reloadCount * (1 + RemoteUtils.LOADER_FACTOR) < sdkCount) { reloadCount = (int) Math.max(reloadCount, sdkCount * (1 - RemoteUtils.LOADER_FACTOR));
reloadCount = (int) (sdkCount * (1 - RemoteUtils.LOADER_FACTOR));
}
connectionManager.loadCount(reloadCount, null); connectionManager.loadCount(reloadCount, null);
response.setMessage("ok"); response.setMessage("ok");
} }

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote.grpc; package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.ReflectUtils; import com.alibaba.nacos.common.utils.ReflectUtils;
import com.alibaba.nacos.common.utils.UuidUtils; import com.alibaba.nacos.common.utils.UuidUtils;
@ -24,6 +25,7 @@ import com.alibaba.nacos.core.remote.BaseRpcServer;
import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Contexts; import io.grpc.Contexts;
@ -31,6 +33,7 @@ import io.grpc.Grpc;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall; import io.grpc.ServerCall;
import io.grpc.ServerCallHandler; import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
@ -38,7 +41,6 @@ import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition; import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter; import io.grpc.ServerTransportFilter;
import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStream;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.Channel; import io.grpc.netty.shaded.io.netty.channel.Channel;
import io.grpc.protobuf.ProtoUtils; import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls; import io.grpc.stub.ServerCalls;
@ -46,6 +48,7 @@ import io.grpc.util.MutableHandlerRegistry;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
/** /**
* Grpc implementation as a rpc server. * Grpc implementation as a rpc server.
@ -55,6 +58,8 @@ import java.net.InetSocketAddress;
*/ */
public abstract class BaseGrpcServer extends BaseRpcServer { public abstract class BaseGrpcServer extends BaseRpcServer {
private static Executor grpcExecutor;
private Server server; private Server server;
private static final String REQUEST_BI_STREAM_SERVICE_NAME = "BiRequestStream"; private static final String REQUEST_BI_STREAM_SERVICE_NAME = "BiRequestStream";
@ -106,8 +111,12 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
addServices(handlerRegistry, serverInterceptor); addServices(handlerRegistry, serverInterceptor);
server = NettyServerBuilder.forPort(getServicePort()).fallbackHandlerRegistry(handlerRegistry) grpcExecutor = ExecutorFactory.Managed
.addTransportFilter(new ServerTransportFilter() { .newCustomerThreadExecutor("core", Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 4, 10000,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-executor-%d").build());
server = ServerBuilder.forPort(getServicePort()).executor(grpcExecutor)
.fallbackHandlerRegistry(handlerRegistry).addTransportFilter(new ServerTransportFilter() {
@Override @Override
public Attributes transportReady(Attributes transportAttrs) { public Attributes transportReady(Attributes transportAttrs) {
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
@ -120,17 +129,17 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
Attributes attrWraper = transportAttrs.toBuilder() Attributes attrWraper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, UuidUtils.generateUuid()).set(TRANS_KEY_CLIENT_IP, remoteIp) .set(TRANS_KEY_CONN_ID, UuidUtils.generateUuid()).set(TRANS_KEY_CLIENT_IP, remoteIp)
.set(TRANS_KEY_CLIENT_PORT, remotePort).set(TRANS_KEY_LOCAL_PORT, localPort).build(); .set(TRANS_KEY_CLIENT_PORT, remotePort).set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionid = attrWraper.get(TRANS_KEY_CONN_ID); String connectionId = attrWraper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE.info(" connection transportReady,connectionid = {} ", connectionid); Loggers.REMOTE.info(" connection transportReady,connectionId = {} ", connectionId);
return attrWraper; return attrWraper;
} }
@Override @Override
public void transportTerminated(Attributes transportAttrs) { public void transportTerminated(Attributes transportAttrs) {
String connectionid = transportAttrs.get(TRANS_KEY_CONN_ID); String connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE.info(" connection transportTerminated,connectionid = {} ", connectionid); Loggers.REMOTE.info(" connection transportTerminated,connectionId = {} ", connectionId);
connectionManager.unregister(connectionid); connectionManager.unregister(connectionId);
} }
}).build(); }).build();
server.start(); server.start();

View File

@ -30,8 +30,8 @@ import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo; import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer; import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.sys.utils.ApplicationUtils;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -60,7 +60,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() { StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
@Override @Override
public void onNext(Payload payload) { public void onNext(Payload payload) {
String connectionId = CONTEXT_KEY_CONN_ID.get(); String connectionId = CONTEXT_KEY_CONN_ID.get();
Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get(); Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
String clientIp = CONTEXT_KEY_CONN_CLIENT_IP.get(); String clientIp = CONTEXT_KEY_CONN_CLIENT_IP.get();
@ -76,9 +76,9 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(metadata.getConnectionId(), ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(metadata.getConnectionId(),
metadata.getClientIp(), metadata.getClientPort(), localPort, ConnectionType.GRPC.getType(), metadata.getClientIp(), metadata.getClientPort(), localPort, ConnectionType.GRPC.getType(),
metadata.getClientVersion(), metadata.getLabels()); metadata.getClientVersion(), metadata.getLabels());
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
if (connectionManager.isOverLimit() || !ApplicationUtils.isStarted()) { if (connectionManager.isOverLimit() || !ApplicationUtils.isStarted()) {
//Not register to the connection manager if current server is over limit or server is starting. //Not register to the connection manager if current server is over limit or server is starting.
try { try {
@ -89,12 +89,10 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
} }
return; return;
} }
//if connnection is already terminated,not register it. //if connection is already terminated,not register it.
if (connection.isConnected()) { connectionManager.register(connectionId, connection);
connectionManager.register(connectionId, connection);
}
} else if (plainRequest.getBody() instanceof Response) { } else if (plainRequest.getBody() instanceof Response) {
Response response = (Response) plainRequest.getBody(); Response response = (Response) plainRequest.getBody();
RpcAckCallbackSynchronizer.ackNotify(connectionId, response); RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
@ -104,8 +102,16 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
Loggers.REMOTE.error("grpc streamObserver error .", t); if (responseObserver instanceof ServerCallStreamObserver) {
responseObserver.onCompleted(); ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
if (serverCallStreamObserver.isCancelled()) {
//client close the stream.
return;
} else {
serverCallStreamObserver.onCompleted();
}
}
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils; import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException; import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.common.remote.exception.ConnectionBusyException;
import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo; import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
@ -34,6 +35,7 @@ import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.netty.channel.Channel; import io.grpc.netty.shaded.io.netty.channel.Channel;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.util.Map; import java.util.Map;
@ -58,7 +60,13 @@ public class GrpcConnection extends Connection {
private void sendRequestNoAck(Request request, RequestMeta meta) throws NacosException { private void sendRequestNoAck(Request request, RequestMeta meta) throws NacosException {
try { try {
streamObserver.onNext(GrpcUtils.convert(request, wrapMeta(meta))); //StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {
if (this.isBusy()) {
throw new ConnectionBusyException(this.getMetaInfo().getConnectionId() + ",connection busy.");
}
streamObserver.onNext(GrpcUtils.convert(request, wrapMeta(meta)));
}
} catch (Exception e) { } catch (Exception e) {
if (e instanceof StatusRuntimeException) { if (e instanceof StatusRuntimeException) {
throw new ConnectionAlreadyClosedException(e); throw new ConnectionAlreadyClosedException(e);
@ -89,14 +97,15 @@ public class GrpcConnection extends Connection {
String requestId = String.valueOf(PushAckIdGenerator.getNextId()); String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId); request.setRequestId(requestId);
sendRequestNoAck(request, meta); sendRequestNoAck(request, meta);
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId, DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId,
callBack, callBack, new DefaultRequestFuture.TimeoutInnerTrigger() {
new DefaultRequestFuture.TimeoutInnerTrigger() { @Override
@Override public void triggerOnTimeout() {
public void triggerOnTimeout() { RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId);
RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId); }
} });
});
RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture); RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture);
return defaultPushFuture; return defaultPushFuture;
} }
@ -137,14 +146,35 @@ public class GrpcConnection extends Connection {
@Override @Override
public void close() { public void close() {
try { try {
streamObserver.onCompleted(); if (isConnected()) {
closeBiStream();
channel.close();
}
} catch (Exception e) { } catch (Exception e) {
Loggers.REMOTE.debug(String.format("[%s] connection close exception : %s", "grpc", e.getMessage())); Loggers.REMOTE.debug(String.format("[%s] connection close exception : %s", "grpc", e.getMessage()));
} }
} }
private void closeBiStream() {
if (streamObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) streamObserver);
if (!serverCallStreamObserver.isCancelled()) {
serverCallStreamObserver.onCompleted();
}
}
}
@Override
public boolean isBusy() {
if (streamObserver instanceof ServerCallStreamObserver) {
return !((ServerCallStreamObserver) streamObserver).isReady();
}
return false;
}
@Override @Override
public boolean isConnected() { public boolean isConnected() {
return channel.isActive(); return channel != null && channel.isOpen() && channel.isActive();
} }
} }

37
pom.xml
View File

@ -128,6 +128,7 @@
<!-- dependency version --> <!-- dependency version -->
<spring-boot-dependencies.version>2.1.17.RELEASE</spring-boot-dependencies.version> <spring-boot-dependencies.version>2.1.17.RELEASE</spring-boot-dependencies.version>
<servlet-api.version>3.0</servlet-api.version> <servlet-api.version>3.0</servlet-api.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.4</commons-lang3.version> <commons-lang3.version>3.4</commons-lang3.version>
<commons-io.version>2.2</commons-io.version> <commons-io.version>2.2</commons-io.version>
<commons-collections.version>3.2.2</commons-collections.version> <commons-collections.version>3.2.2</commons-collections.version>
@ -140,6 +141,7 @@
<httpcore.version>4.4.1</httpcore.version> <httpcore.version>4.4.1</httpcore.version>
<httpclient.version>4.5</httpclient.version> <httpclient.version>4.5</httpclient.version>
<httpasyncclient.version>4.1.3</httpasyncclient.version> <httpasyncclient.version>4.1.3</httpasyncclient.version>
<async-http-client.version>1.7.17</async-http-client.version>
<mysql-connector-java.version>8.0.16</mysql-connector-java.version> <mysql-connector-java.version>8.0.16</mysql-connector-java.version>
<derby.version>10.14.2.0</derby.version> <derby.version>10.14.2.0</derby.version>
<cglib-nodep.version>2.1</cglib-nodep.version> <cglib-nodep.version>2.1</cglib-nodep.version>
@ -303,7 +305,7 @@
<encoding>UTF-8</encoding> <encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput> <consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError> <failsOnError>true</failsOnError>
<excludes>**/istio/model/**,**/nacos/test/**,**/api/grpc/auto/**,**/consistency/entity/**</excludes> <excludes>**/istio/model/**,**/nacos/test/**,**/api/grpc/auto/**</excludes>
</configuration> </configuration>
<executions> <executions>
<execution> <execution>
@ -338,7 +340,6 @@
<exclude>**/consistency/entity/**</exclude> <exclude>**/consistency/entity/**</exclude>
<exclude>**/*.txt</exclude> <exclude>**/*.txt</exclude>
<exclude>**/*.factories</exclude> <exclude>**/*.factories</exclude>
<exclude>/console-ui/**</exclude>
</excludes> </excludes>
</configuration> </configuration>
<executions> <executions>
@ -734,6 +735,12 @@
<version>${hessian.version}</version> <version>${hessian.version}</version>
</dependency> </dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<!-- Apache commons --> <!-- Apache commons -->
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
@ -771,6 +778,7 @@
<version>${commons-cli.version}</version> <version>${commons-cli.version}</version>
</dependency> </dependency>
<!-- Logging libs --> <!-- Logging libs -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@ -833,6 +841,12 @@
<version>${httpasyncclient.version}</version> <version>${httpasyncclient.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>${async-http-client.version}</version>
</dependency>
<!-- JDBC libs --> <!-- JDBC libs -->
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
@ -934,19 +948,19 @@
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>${netty-all.version}</version> <version>${netty-all.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.annotation</groupId> <groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId> <artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version> <version>1.3.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.mina</groupId> <groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId> <artifactId>mina-core</artifactId>
<version>${mina-core.version}</version> <version>${mina-core.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
@ -967,7 +981,6 @@
</dependency> </dependency>
<!-- gRPC dependency start --> <!-- gRPC dependency start -->
<dependency> <dependency>
<groupId>io.grpc</groupId> <groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId> <artifactId>grpc-netty-shaded</artifactId>
@ -996,9 +1009,9 @@
</dependency> </dependency>
<!-- gRPC dependency end --> <!-- gRPC dependency end -->
<!-- Rsocket --> <!-- Rsocket -->
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
@ -1095,9 +1108,9 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<!-- Rsocket --> <!-- Rsocket -->
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
@ -1141,7 +1154,7 @@
</dependency> </dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<!--<distributionManagement>--> <!--<distributionManagement>-->
<!--<snapshotRepository>--> <!--<snapshotRepository>-->
<!--&lt;!&ndash; 这里的ID一定要在maven setting文件中存在于server下的ID &ndash;&gt;--> <!--&lt;!&ndash; 这里的ID一定要在maven setting文件中存在于server下的ID &ndash;&gt;-->
@ -1153,7 +1166,7 @@
<!--<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>--> <!--<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>-->
<!--</repository>--> <!--</repository>-->
<!--</distributionManagement>--> <!--</distributionManagement>-->
<repositories> <repositories>
<repository> <repository>
<id>central</id> <id>central</id>

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2020 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.client; package com.alibaba.nacos.test.config;
import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.PropertyKeyConst;
@ -55,16 +55,16 @@ public class ConfigTest {
public void before() throws Exception { public void before() throws Exception {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848"); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.67.159:8849"); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.67.159:7001");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848"); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848");
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848"); //"11.239.114.187:8848");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); //properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); //properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
configService = NacosFactory.createConfigService(properties); configService = NacosFactory.createConfigService(properties);
//Thread.sleep(2000L); //Thread.sleep(2000L);
} }
@ -85,22 +85,22 @@ public class ConfigTest {
public String getCurrentServer() { public String getCurrentServer() {
return "11.160.144.148:8848"; return "11.160.144.148:8848";
} }
@Override @Override
public List<String> getServerList() { public List<String> getServerList() {
return Lists.newArrayList("11.160.144.148:8848"); return Lists.newArrayList("11.160.144.148:8848");
} }
}); });
//client.start(); //client.start();
ConfigBatchListenRequest syncRequest = new ConfigBatchListenRequest(); ConfigBatchListenRequest syncRequest = new ConfigBatchListenRequest();
syncRequest.setListen(true); syncRequest.setListen(true);
final String dataId = "xiaochun.xxc"; final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc"; final String group = "xiaochun.xxc";
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
System.out.println("100K start send 100 request..."); System.out.println("100K start send 100 request...");
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
StringBuilder listenConfigsBuilder = new StringBuilder(); StringBuilder listenConfigsBuilder = new StringBuilder();
listenConfigsBuilder.append(dataId + i).append(WORD_SEPARATOR); listenConfigsBuilder.append(dataId + i).append(WORD_SEPARATOR);
@ -114,7 +114,7 @@ public class ConfigTest {
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
System.out.println("total cost:" + (end - start)); System.out.println("total cost:" + (end - start));
StringBuilder listenConfigsBuilder = new StringBuilder(); StringBuilder listenConfigsBuilder = new StringBuilder();
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
listenConfigsBuilder.append(dataId + i).append(WORD_SEPARATOR); listenConfigsBuilder.append(dataId + i).append(WORD_SEPARATOR);
@ -137,7 +137,7 @@ public class ConfigTest {
public void test333() throws Exception { public void test333() throws Exception {
Map<String, String> labels = new HashMap<String, String>(); Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels); RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels);
client.init(new ServerListFactory() { client.init(new ServerListFactory() {
@Override @Override
@ -165,7 +165,7 @@ public class ConfigTest {
syncRequest.addConfigListenContext(group, dataId, null, null); syncRequest.addConfigListenContext(group, dataId, null, null);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
System.out.println("send :" + System.currentTimeMillis()); System.out.println("send :" + System.currentTimeMillis());
RequestFuture requestFuture = client.requestFuture(syncRequest); RequestFuture requestFuture = client.requestFuture(syncRequest);
while (true) { while (true) {
Thread.sleep(1L); Thread.sleep(1L);
@ -189,33 +189,35 @@ public class ConfigTest {
public void test2() throws Exception { public void test2() throws Exception {
final String dataId = "xiaochun.xxc"; final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc"; final String group = "xiaochun.xxc";
Random random = new Random();
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848"); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848");
//" //"
List<ConfigService> configServiceList = new ArrayList<ConfigService>(); List<ConfigService> configServiceList = new ArrayList<ConfigService>();
for (int i = 0; i < 300; i++) { for (int i = 0; i < 500; i++) {
ConfigService configService = NacosFactory.createConfigService(properties); ConfigService configService = NacosFactory.createConfigService(properties);
Listener listener = new AbstractListener() { Listener listener = new AbstractListener() {
@Override @Override
public void receiveConfigInfo(String configInfo) { public void receiveConfigInfo(String configInfo) {
System.out.println( System.out.println(
"receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo))); "receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo)));
} }
}; };
for (int j = 0; j < 50; j++) {
configService.addListener(dataId, group, listener); configService.addListener(dataId + random.nextInt(200), group, listener);
}
configServiceList.add(configService); configServiceList.add(configService);
System.out.println(configServiceList.size()); System.out.println(configServiceList.size());
} }
System.out.println("2"); System.out.println("2");
Thread th = new Thread(new Runnable() { Thread th = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
Random random = new Random(); Random random = new Random();
int times = 10000; int times = 10000;
while (times > 0) { while (times > 0) {
@ -223,14 +225,15 @@ public class ConfigTest {
boolean result = configService.publishConfig(dataId, group, "" + System.currentTimeMillis()); boolean result = configService.publishConfig(dataId, group, "" + System.currentTimeMillis());
times--; times--;
System.out.println("发布配置:" + result);
Thread.sleep(1000L); Thread.sleep(1000L);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
}); });
th.start(); th.start();
@ -239,56 +242,51 @@ public class ConfigTest {
@Test @Test
public void test() throws Exception { public void test() throws Exception {
//SnapShotSwitch.setIsSnapShot(false); //SnapShotSwitch.setIsSnapShot(false);
final Random random = new Random(); final Random random = new Random(System.currentTimeMillis());
final String dataId = "xiaochun.xxc"; final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc"; final String group = "xiaochun.xxc";
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
String[] s = configInfo.split("__");
System.out.println("receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(s[1])));
}
};
Thread th = new Thread(new Runnable() { Thread th = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
int times = 1000; int times = 10000;
while (times > 0) { while (times > 0) {
try { try {
String content1 = System.currentTimeMillis() + ""; String content1 = new String(new byte[5000]) + "__" + System.currentTimeMillis();
boolean b = configService.publishConfig(dataId + random.nextInt(20), group, content1); System.out.println(content1.length());
boolean b = configService.publishConfig(dataId + random.nextInt(400), group, content1);
times--; times--;
Thread.sleep(1000L); System.out.println("发布配置:" + b);
Thread.sleep(500L);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
System.out.println(times); System.out.println(times);
System.out.println("Write Done"); System.out.println("Write Done");
} }
}); });
th.start(); th.start();
Listener listener = new AbstractListener() { for (int i = 0; i < 500; i++) {
@Override String content1 = System.currentTimeMillis() + "";
public void receiveConfigInfo(String configInfo) { configService.getConfigAndSignListener(dataId + i, group, 3000L, listener);
System.out.println(
"receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo)));
}
};
for (int i = 0; i < 20; i++) {
final int ls = i;
configService.addListener(dataId + i, group, listener);
} }
Thread.sleep(1000000L); Thread.sleep(1000000L);
for (int i = 0; i < 20; i++) {
configService.removeListener(dataId + i, group, listener);
}
System.out.println("remove listens."); System.out.println("remove listens.");
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
@ -312,18 +310,18 @@ public class ConfigTest {
boolean result = configService.publishConfig(dataId, group, content); boolean result = configService.publishConfig(dataId, group, content);
//Assert.assertTrue(result); //Assert.assertTrue(result);
Listener listener = new AbstractListener() { Listener listener = new AbstractListener() {
@Override @Override
public void receiveConfigInfo(String configInfo) { public void receiveConfigInfo(String configInfo) {
System.out.println("receiveConfigInfo1 :" + configInfo); System.out.println("receiveConfigInfo1 :" + configInfo);
} }
}; };
configService.getConfigAndSignListener(dataId, group, 5000, listener); configService.getConfigAndSignListener(dataId, group, 5000, listener);
System.out.println("Add Listen config.."); System.out.println("Add Listen config..");
Thread th = new Thread(new Runnable() { Thread th = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -333,21 +331,20 @@ public class ConfigTest {
while (times > 0) { while (times > 0) {
try { try {
configService.publishConfig(dataId, group, "value" + System.currentTimeMillis()); configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
times--; times--;
Thread.sleep(5000L); Thread.sleep(5000L);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
System.out.println(times); System.out.println(times);
System.out.println("Write Done"); System.out.println("Write Done");
} }
}); });
th.start(); th.start();
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
System.out.println("input content"); System.out.println("input content");