[ISSUE #1097] Naming support grpc server forward request (#3480)

* re subscribe service when reconnect

* change grpc instance maintain by heartbeat

* Add lifecycle for remoting workers

* Refactor naming client redo when reconnect

* Fix checkstyle and PMD

* Implement forward instance request to responsible server

* Implement forward heart beat to servers
This commit is contained in:
杨翊 SionYang 2020-07-30 17:00:57 +08:00 committed by GitHub
parent b1a587dc72
commit 5f528d8036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1728 additions and 73 deletions

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.remote.request.Request;
* @author liuzunfei
* @version $Id: ConfigCommonRequest.java, v 0.1 2020年07月13日 9:05 PM liuzunfei Exp $
*/
public abstract class ConfigCommonRequest extends Request {
public abstract class AbstractConfigRequest extends Request {
@Override
public String getModule() {

View File

@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request;
* @author liuzunfei
* @version $Id: ConfigBatchListenRequest.java, v 0.1 2020年07月27日 7:46 PM liuzunfei Exp $
*/
public class ConfigBatchListenRequest extends ConfigCommonRequest {
public class ConfigBatchListenRequest extends AbstractConfigRequest {
private static final String Y = "Y";

View File

@ -25,7 +25,7 @@ import java.util.Map;
* @author liuzunfei
* @version $Id: ConfigPublishRequest.java, v 0.1 2020年07月16日 4:30 PM liuzunfei Exp $
*/
public class ConfigPublishRequest extends ConfigCommonRequest {
public class ConfigPublishRequest extends AbstractConfigRequest {
String dataId;
@ -59,7 +59,7 @@ public class ConfigPublishRequest extends ConfigCommonRequest {
*/
public void putAdditonalParam(String key, String value) {
if (additonMap == null) {
additonMap = new HashMap<String, String>();
additonMap = new HashMap<String, String>(2);
}
additonMap.put(key, value);
}

View File

@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request;
* @author liuzunfei
* @version $Id: ConfigQueryRequest.java, v 0.1 2020年07月13日 9:06 PM liuzunfei Exp $
*/
public class ConfigQueryRequest extends ConfigCommonRequest {
public class ConfigQueryRequest extends AbstractConfigRequest {
private String dataId;

View File

@ -22,7 +22,7 @@ package com.alibaba.nacos.api.config.remote.request;
* @author liuzunfei
* @version $Id: ConfigRemoveRequest.java, v 0.1 2020年07月16日 4:31 PM liuzunfei Exp $
*/
public class ConfigRemoveRequest extends ConfigCommonRequest {
public class ConfigRemoveRequest extends AbstractConfigRequest {
String dataId;
@ -119,4 +119,4 @@ public class ConfigRemoveRequest extends ConfigCommonRequest {
public void setTenant(String tenant) {
this.tenant = tenant;
}
}
}

View File

@ -35,4 +35,8 @@ public class NamingRemoteConstants {
public static final String NOTIFY_SUBSCRIBER = "notifySubscriber";
public static final String LIST_SERVICE = "listService";
public static final String FORWARD_INSTANCE = "forwardInstance";
public static final String FORWARD_HEART_BEAT = "forwardHeartBeat";
}

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.remote.request.Request;
*
* @author liuzunfei
*/
public abstract class NamingCommonRequest extends Request {
public abstract class AbstractNamingRequest extends Request {
private String namespace;
@ -31,10 +31,10 @@ public abstract class NamingCommonRequest extends Request {
private String groupName;
public NamingCommonRequest() {
public AbstractNamingRequest() {
}
public NamingCommonRequest(String namespace, String serviceName, String groupName) {
public AbstractNamingRequest(String namespace, String serviceName, String groupName) {
this.namespace = namespace;
this.serviceName = serviceName;
this.groupName = groupName;

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
*
* @author xiweng.yy
*/
public class InstanceRequest extends NamingCommonRequest {
public class InstanceRequest extends AbstractNamingRequest {
private String type;

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
*
* @author xiweng.yy
*/
public class ServiceListRequest extends NamingCommonRequest {
public class ServiceListRequest extends AbstractNamingRequest {
private int pageNo;

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
*
* @author xiweng.yy
*/
public class ServiceQueryRequest extends NamingCommonRequest {
public class ServiceQueryRequest extends AbstractNamingRequest {
private String cluster;

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
*
* @author xiweng.yy
*/
public class SubscribeServiceRequest extends NamingCommonRequest {
public class SubscribeServiceRequest extends AbstractNamingRequest {
private boolean subscribe;

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Future;
* @author liuzunfei
* @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Connection {
public static final String HEALTHY = "healthy";

View File

@ -23,6 +23,9 @@ package com.alibaba.nacos.api.remote.connection;
*/
public enum ConnectionType {
/**
* gRPC connection.
*/
GRPC("GRPC", "Grpc Connection");
String type;

View File

@ -22,6 +22,7 @@ package com.alibaba.nacos.api.remote.request;
* @author liuzunfei
* @version $Id: InternalRequest.java, v 0.1 2020年07月22日 8:33 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class InternalRequest extends Request {
@Override

View File

@ -24,6 +24,7 @@ import java.util.Map;
*
* @author liuzunfei
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Request {
private final Map<String, String> headers = new HashMap<String, String>();

View File

@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
* @author liuzunfei
* @version $Id: Response.java, v 0.1 2020年07月13日 6:03 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Response {
int resultCode = ResponseCode.SUCCESS.getCode();

View File

@ -23,8 +23,14 @@ package com.alibaba.nacos.api.remote.response;
*/
public enum ResponseCode {
/**
* Request success.
*/
SUCCESS(200, "response ok"),
/**
* Request failed.
*/
FAIL(500, "response fail");
int code;

View File

@ -22,6 +22,7 @@ package com.alibaba.nacos.api.remote.response;
* @author liuzunfei
* @version $Id: ServerPushResponse.java, v 0.1 2020年07月20日 1:21 PM liuzunfei Exp $
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class ServerPushResponse extends Response {
/**

View File

@ -56,10 +56,13 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
private final RpcClient rpcClient;
private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener;
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.rpcClient = RpcClientFactory.getClient("naming");
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}
@ -67,6 +70,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
rpcClient.init(serverListFactory);
rpcClient.start();
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder));
rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
}
@Override
@ -76,6 +80,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);
}
@Override
@ -86,6 +91,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
namingGrpcConnectionEventListener.removeInstanceForRedo(serviceName, groupName, instance);
}
@Override
@ -147,14 +153,17 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters,
true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener.cacheSubscriberForRedo(serviceNameWithGroup, clusters);
return response.getServiceInfo();
}
@Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId,
NamingUtils.getGroupedName(serviceName, groupName), clusters, false);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceNameWithGroup, clusters,
false);
requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener.removeSubscriberForRedo(serviceNameWithGroup, clusters);
}
@Override

View File

@ -0,0 +1,132 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.remote.ConnectionEventListener;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Naming client gprc connection event listener.
*
* <p>
* When connection reconnect to server, redo the register and subscribe.
* </p>
*
* @author xiweng.yy
*/
public class NamingGrpcConnectionEventListener implements ConnectionEventListener {
private final NamingGrpcClientProxy clientProxy;
private final ConcurrentMap<String, Set<Instance>> registeredInstanceCached = new ConcurrentHashMap<String, Set<Instance>>();
private final Set<String> subscribes = new ConcurrentHashSet<String>();
public NamingGrpcConnectionEventListener(NamingGrpcClientProxy clientProxy) {
this.clientProxy = clientProxy;
}
@Override
public void onConnected() {
}
@Override
public void onReconnected() {
redoSubscribe();
redoRegisterEachService();
}
private void redoSubscribe() {
for (String each : subscribes) {
ServiceInfo serviceInfo = ServiceInfo.fromKey(each);
try {
clientProxy.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters());
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.warn(String.format("re subscribe service %s failed", serviceInfo.getName()), e);
}
}
}
private void redoRegisterEachService() {
for (Map.Entry<String, Set<Instance>> each : registeredInstanceCached.entrySet()) {
String serviceName = NamingUtils.getServiceName(each.getKey());
String groupName = NamingUtils.getGroupName(each.getKey());
redoRegisterEachInstance(serviceName, groupName, each.getValue());
}
}
private void redoRegisterEachInstance(String serviceName, String groupName, Set<Instance> instances) {
for (Instance each : instances) {
try {
clientProxy.registerService(serviceName, groupName, each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER
.warn(String.format("redo register for service %s@@%s failed", groupName, serviceName), e);
}
}
}
@Override
public void onDisConnect() {
}
/**
* Cache registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
registeredInstanceCached.putIfAbsent(key, new ConcurrentHashSet<Instance>());
registeredInstanceCached.get(key).add(instance);
}
/**
* Remove registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void removeInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
Set<Instance> instances = registeredInstanceCached.get(key);
if (null != instances) {
instances.remove(instance);
}
}
public void cacheSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
}
public void removeSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.remove(ServiceInfo.getKey(fullServiceName, cluster));
}
}

View File

@ -23,11 +23,11 @@ import com.alibaba.nacos.api.grpc.GrpcResponse;
import com.alibaba.nacos.api.grpc.RequestGrpc;
import com.alibaba.nacos.api.grpc.RequestStreamGrpc;
import com.alibaba.nacos.api.remote.ResponseRegistry;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.request.HeartBeatRequest;
import com.alibaba.nacos.api.remote.request.PushAckRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse;
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
import com.alibaba.nacos.api.remote.response.Response;
@ -101,18 +101,18 @@ public class GrpcClient extends RpcClient {
ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext()
.build();
RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp);
boolean checkSucess = serverCheck(grpcServiceStubTemp);
if (checkSucess) {
return grpcServiceStubTemp;
} else {
shuntDownChannel(managedChannelTemp);
return null;
}
}
/**
@ -129,7 +129,7 @@ public class GrpcClient extends RpcClient {
private void connectToServer() {
rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING);
GrpcServerInfo serverInfo = nextServer();
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp,
serverInfo.serverPort);
@ -142,8 +142,8 @@ public class GrpcClient extends RpcClient {
bindRequestStream(requestStreamStubTemp);
//switch current channel and stub
channel = (ManagedChannel) newChannelStubTemp.getChannel();
grpcStreamServiceStub = requestStreamStubTemp;
grpcFutureServiceStub = grpcFutureServiceStubTemp;
grpcStreamServiceStub = requestStreamStubTemp;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
notifyConnected();
@ -154,7 +154,7 @@ public class GrpcClient extends RpcClient {
@Override
public void start() throws NacosException {
if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) {
LOGGER.error("RpcClient has not init yet, please check init ServerListFactory...");
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet");
@ -162,22 +162,22 @@ public class GrpcClient extends RpcClient {
if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) {
return;
}
connectToServer();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
sendBeat();
}
}, 0, 3000, TimeUnit.MILLISECONDS);
super.registerServerPushResponseHandler(new ServerPushResponseHandler() {
@Override
public void responseReply(Response response) {
if (response instanceof ConnectResetResponse) {
try {
if (!isRunning()) {
return;
}
@ -189,7 +189,7 @@ public class GrpcClient extends RpcClient {
}
}
});
eventExecutor.submit(new Runnable() {
@Override
public void run() {
@ -213,7 +213,7 @@ public class GrpcClient extends RpcClient {
* switch a new server.
*/
private void switchServer(final boolean onStarting) {
//try to get operate lock.
boolean lockResult = lock.tryLock();
if (!lockResult) {
@ -248,7 +248,7 @@ public class GrpcClient extends RpcClient {
if (newChannelStubTemp != null) {
RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
bindRequestStream(requestStreamStubTemp);
final ManagedChannel depratedChannel = channel;
//switch current channel and stub
@ -278,10 +278,10 @@ public class GrpcClient extends RpcClient {
* Send Heart Beat Request.
*/
public void sendBeat() {
int maxRetryTimes = 3;
while (maxRetryTimes > 0) {
try {
if (!isRunning()) {
return;
@ -305,7 +305,7 @@ public class GrpcClient extends RpcClient {
LOGGER.error("Send heart beat error, ", e);
}
}
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
LOGGER.warn("Max retry times for send heart beat fail reached,trying to switch server... ");
switchServer(false);
@ -314,7 +314,7 @@ public class GrpcClient extends RpcClient {
private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(ClientCommonUtils.VERSION).build();
return meta;
}
@ -326,11 +326,11 @@ public class GrpcClient extends RpcClient {
*/
private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) {
try {
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(serverCheckRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest)))
.setType(serverCheckRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest)))
.build()).build();
ListenableFuture<GrpcResponse> responseFuture = requestBlockingStub.request(streamRequest);
GrpcResponse response = responseFuture.get();
@ -351,7 +351,7 @@ public class GrpcClient extends RpcClient {
streamStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@Override
public void onNext(GrpcResponse grpcResponse) {
LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse);
try {
sendAckResponse(grpcResponse.getAck(), true);
@ -367,14 +367,14 @@ public class GrpcClient extends RpcClient {
myresponse.setBodyString(bodyString);
response = myresponse;
}
serverPushResponseListeners.forEach(new Consumer<ServerPushResponseHandler>() {
@Override
public void accept(ServerPushResponseHandler serverPushResponseHandler) {
serverPushResponseHandler.responseReply(response);
}
});
} catch (Exception e) {
e.printStackTrace(System.out);
LOGGER.error("error tp process server push response :{}", grpcResponse);
@ -406,11 +406,11 @@ public class GrpcClient extends RpcClient {
@Override
public Response request(Request request) throws NacosException {
int maxRetryTimes = 3;
while (maxRetryTimes > 0) {
try {
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request))))
.build();
@ -425,10 +425,10 @@ public class GrpcClient extends RpcClient {
LOGGER.error("grpc client request error, retry...", e.getMessage(), e);
}
}
LOGGER.warn("Max retry times for request fail reached.");
throw new NacosException(NacosException.SERVER_ERROR, "Fail to request.");
}
private Response convertResponse(GrpcResponse grpcResponse) {

View File

@ -85,6 +85,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
}
}
@Override
public long currentEventSize() {
return queue.size();
}

View File

@ -52,5 +52,4 @@ public class ServerLoaderController {
return ResponseEntity.ok().body("success");
}
}

View File

@ -75,7 +75,7 @@ public class ConnectionManager {
if (remove != null) {
remove.closeGrapcefully();
Loggers.GRPC.info(" connection unregistered successfully,connectionid = {} ", connectionId);
clientConnectionEventListenerRegistry.notifyClientConnected(remove);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}

View File

@ -23,7 +23,9 @@ import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.request.RequestTypeConstants;
import com.alibaba.nacos.api.remote.response.HeartBeatResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.event.RemotingHeartBeatEvent;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -51,6 +53,7 @@ public class HeartBeatRequestHandler extends RequestHandler {
public Response handle(Request request, RequestMeta meta) throws NacosException {
String connectionId = meta.getConnectionId();
connectionManager.refreshActiveTime(connectionId);
NotifyCenter.publishEvent(new RemotingHeartBeatEvent(connectionId, meta.getClientIp(), meta.getClientVersion()));
return new HeartBeatResponse();
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.event;
import com.alibaba.nacos.common.notify.Event;
/**
* Remoting connection heart beat event.
*
* @author xiweng.yy
*/
public class RemotingHeartBeatEvent extends Event {
private final String connectionId;
private final String clientIp;
private final String clientVersion;
public RemotingHeartBeatEvent(String connectionId, String clientIp, String clientVersion) {
this.connectionId = connectionId;
this.clientIp = clientIp;
this.clientVersion = clientVersion;
}
public String getConnectionId() {
return connectionId;
}
public String getClientIp() {
return clientIp;
}
public String getClientVersion() {
return clientVersion;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
/**
* Cluster client.
*
* @author xiweng.yy
*/
public interface ClusterClient {
/**
* Send request to target server.
*
* @param request request
* @return response
* @throws NacosException nacos exception
*/
Response request(Request request) throws NacosException;
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.cluster.remote.grpc.GrpcClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Cluster client proxy.
*
* @author xiweng.yy
*/
@Component
public class ClusterClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterClientManager.class);
private final ConcurrentMap<String, ClusterClient> clientMap = new ConcurrentHashMap<>();
private final ServerMemberManager memberManager;
public ClusterClientManager(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
/**
* Init cluster client manager.
*/
@PostConstruct
public void init() {
for (Member each : memberManager.allMembersWithoutSelf()) {
clientMap.put(each.getAddress(), new GrpcClient(each.getAddress()));
}
for (ClusterClient each : clientMap.values()) {
try {
((GrpcClient) each).start();
} catch (NacosException nacosException) {
LOGGER.error("Create cluster connection failed", nacosException);
}
}
}
public boolean hasClientForMember(String memberAddress) {
return clientMap.containsKey(memberAddress);
}
public ClusterClient getClusterClient(String memberAddress) {
return clientMap.getOrDefault(memberAddress, null);
}
public Collection<ClusterClient> getAllClusterClient() {
return clientMap.values();
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.api.remote.connection.ConnectionMetaInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.response.ServerPushResponse;
import java.util.concurrent.Future;
/**
* Cluster connection.
*
* @author xiweng.yy
*/
public class ClusterConnection extends Connection {
public ClusterConnection(ConnectionMetaInfo metaInfo) {
super(metaInfo);
}
@Override
public boolean sendPush(ServerPushResponse request, long timeoutMills) throws Exception {
return false;
}
@Override
public boolean sendPushNoAck(ServerPushResponse request) throws Exception {
return false;
}
@Override
public Future<Boolean> sendPushWithFuture(ServerPushResponse request) throws Exception {
return null;
}
@Override
public void sendPushCallBackWithCallBack(ServerPushResponse request, PushCallBack callBack) throws Exception {
}
@Override
public void closeGrapcefully() {
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
/**
* abstract remote client to connect to server.
*
* @author liuzunfei
* @version $Id: RpcClient.java, v 0.1 2020年07月13日 9:15 PM liuzunfei Exp $
*/
public abstract class RpcClient implements Closeable, ClusterClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
protected String connectionId;
protected String target;
protected AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);
protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.grpc.worker");
t.setDaemon(true);
return t;
}
});
public RpcClient() {
}
public RpcClient(String target) {
init(target);
}
/**
* init server list factory.
*
* @param target target address
*/
public void init(String target) {
if (!isWaitInited()) {
return;
}
this.connectionId = UUID.randomUUID().toString();
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITED);
this.target = target;
LOGGER.info("RpcClient init ,connectionId={}, target ={}", this.connectionId, target);
}
/**
* check is this client is inited.
*
* @return true if is waiting init
*/
public boolean isWaitInited() {
return this.rpcClientStatus.get() == RpcClientStatus.WAIT_INIT;
}
/**
* check is this client is running.
*
* @return true if is running
*/
public boolean isRunning() {
return this.rpcClientStatus.get() == RpcClientStatus.RUNNING;
}
/**
* check is this client is in init status,have not start th client.
*
* @return true if is init
*/
public boolean isInitStatus() {
return this.rpcClientStatus.get() == RpcClientStatus.INITED;
}
/**
* check is this client is in starting process.
*
* @return true if is starting
*/
public boolean isStarting() {
return this.rpcClientStatus.get() == RpcClientStatus.STARTING;
}
/**
* Start this client.
*
* @throws NacosException nacos exception
*/
public abstract void start() throws NacosException;
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote;
/**
* RpcClientStatus.
* @author liuzunfei
* @version $Id: RpcClientStatus.java, v 0.1 2020年07月14日 3:49 PM liuzunfei Exp $
*/
public enum RpcClientStatus {
WAIT_INIT(0, "wait to init serverlist factory... "),
INITED(1, "server list factory is ready,wait to start"),
STARTING(2, "server list factory is ready,wait to start"),
RUNNING(3, "client is running..."),
SWITCHING_SERVER(5, "reconnecting...");
int status;
String desc;
RpcClientStatus(int status, String desc) {
this.status = status;
this.desc = desc;
}
}

View File

@ -0,0 +1,373 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote.grpc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.GrpcMetadata;
import com.alibaba.nacos.api.grpc.GrpcRequest;
import com.alibaba.nacos.api.grpc.GrpcResponse;
import com.alibaba.nacos.api.grpc.RequestGrpc;
import com.alibaba.nacos.api.grpc.RequestStreamGrpc;
import com.alibaba.nacos.api.remote.ResponseRegistry;
import com.alibaba.nacos.api.remote.request.HeartBeatRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse;
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseTypeConstants;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.cluster.remote.RpcClient;
import com.alibaba.nacos.naming.cluster.remote.RpcClientStatus;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* gRPC Client.
*
* @author liuzunfei
* @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $
*/
public class GrpcClient extends RpcClient {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
/**
* change listeners handler registry.
*/
protected List<ServerPushResponseHandler> serverPushResponseListeners = new ArrayList<ServerPushResponseHandler>();
protected ManagedChannel channel;
protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub;
protected RequestGrpc.RequestBlockingStub grpcServiceStub;
public GrpcClient(String target) {
super(target);
}
/**
* create a new channel .
*
* @param serverIp serverIp.
* @param serverPort serverPort.
* @return if server check success,return stub.
*/
private RequestGrpc.RequestBlockingStub createNewChannelStub(String serverIp, int serverPort) {
ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext()
.build();
RequestGrpc.RequestBlockingStub grpcServiceStubTemp = RequestGrpc.newBlockingStub(managedChannelTemp);
boolean checkSuccess = serverCheck(grpcServiceStubTemp);
LOGGER.info(String.format("create cluster channel to %s:%d result %s", serverIp, serverPort, checkSuccess));
if (checkSuccess) {
return grpcServiceStubTemp;
} else {
shuntDownChannel(managedChannelTemp);
return null;
}
}
/**
* shutdown a channel.
*
* @param managedChannel channel to be shutdown.
*/
private void shuntDownChannel(ManagedChannel managedChannel) {
if (managedChannel != null && !managedChannel.isShutdown()) {
managedChannel.shutdownNow();
}
}
private void connectToServer() {
rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING);
GrpcServerInfo serverInfo = resolveServerInfo(target);
RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp,
serverInfo.serverPort);
if (newChannelStubTemp != null) {
RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
bindRequestStream(requestStreamStubTemp);
//switch current channel and stub
channel = (ManagedChannel) newChannelStubTemp.getChannel();
grpcStreamServiceStub = requestStreamStubTemp;
grpcServiceStub = newChannelStubTemp;
rpcClientStatus.set(RpcClientStatus.RUNNING);
} else {
switchServer(true);
}
}
@Override
public void start() throws NacosException {
if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) {
LOGGER.error("RpcClient has not init yet, please check init ServerListFactory...");
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet");
}
if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) {
return;
}
connectToServer();
executorService.scheduleWithFixedDelay(() -> sendBeat(), 0, 3000, TimeUnit.MILLISECONDS);
}
/**
* switch a new server.
*/
private void switchServer(final boolean onStarting) {
if (onStarting) {
// access on startup fail
rpcClientStatus.set(RpcClientStatus.SWITCHING_SERVER);
} else {
// access from running status, sendbeat fail or receive reset message from server.
boolean changeStatusSuccess = rpcClientStatus
.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.SWITCHING_SERVER);
if (!changeStatusSuccess) {
return;
}
}
executorService.schedule(() -> {
// loop until start client success.
while (!isRunning()) {
//1.get a new server
GrpcServerInfo serverInfo = resolveServerInfo(target);
//2.get a new channel to new server
RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp,
serverInfo.serverPort);
if (newChannelStubTemp != null) {
RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
bindRequestStream(requestStreamStubTemp);
final ManagedChannel depratedChannel = channel;
//switch current channel and stub
channel = (ManagedChannel) newChannelStubTemp.getChannel();
grpcStreamServiceStub = requestStreamStubTemp;
grpcServiceStub = newChannelStubTemp;
rpcClientStatus.getAndSet(RpcClientStatus.RUNNING);
shuntDownChannel(depratedChannel);
continue;
}
try {
//sleep 3 second to switch next server.
Thread.sleep(3000L);
} catch (InterruptedException e) {
// Do nothing.
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
/**
* Send Heart Beat Request.
*/
public void sendBeat() {
try {
if (!isRunning()) {
return;
}
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(heartBeatRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
if (ResponseTypeConstants.CONNECION_UNREGISTER.equals(response.getType())) {
LOGGER.warn("Send heart beat fail,connection is not registerd,trying to switch server ");
switchServer(false);
}
} catch (StatusRuntimeException e) {
if (Status.UNAVAILABLE.getCode().equals(e.getStatus().getCode())) {
LOGGER.warn("Send heart beat fail,server is not avaliable now,trying to switch server ");
switchServer(false);
return;
}
throw e;
} catch (Exception e) {
LOGGER.error("Send heart beat error, ", e);
}
}
private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localServer())
.setVersion(UtilsAndCommons.SERVER_VERSION).build();
return meta;
}
/**
* chenck server if ok.
*
* @param requestBlockingStub requestBlockingStub used to check server.
* @return
*/
private boolean serverCheck(RequestGrpc.RequestBlockingStub requestBlockingStub) {
try {
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(serverCheckRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest)))
.build()).build();
GrpcResponse response = requestBlockingStub.request(streamRequest);
return response != null;
} catch (Exception e) {
return false;
}
}
/**
* bind request stream observer (send a connection).
*
* @param streamStub streamStub to bind.
*/
private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) {
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build();
LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);
streamStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@Override
public void onNext(GrpcResponse grpcResponse) {
LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse);
try {
String message = grpcResponse.getBody().getValue().toStringUtf8();
String type = grpcResponse.getType();
String bodyString = grpcResponse.getBody().getValue().toStringUtf8();
Class classByType = ResponseRegistry.getClassByType(type);
final Response response;
if (classByType != null) {
response = (Response) JacksonUtils.toObj(bodyString, classByType);
} else {
PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class);
myresponse.setBodyString(bodyString);
response = myresponse;
}
serverPushResponseListeners
.forEach(serverPushResponseHandler -> serverPushResponseHandler.responseReply(response));
} catch (Exception e) {
LOGGER.error("error tp process server push response :{}", grpcResponse);
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
});
}
@Override
public Response request(Request request) throws NacosException {
if (!this.isRunning()) {
throw new IllegalStateException("Client is not connected to any server now,please retry later");
}
try {
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build();
GrpcResponse response = grpcServiceStub.request(grpcrequest);
String type = response.getType();
String bodyString = response.getBody().getValue().toStringUtf8();
// transfrom grpcResponse to response model
Class classByType = ResponseRegistry.getClassByType(type);
if (classByType != null) {
Object object = JacksonUtils.toObj(bodyString, classByType);
if (object instanceof ConnectionUnregisterResponse) {
switchServer(false);
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "connection is not connected.");
}
return (Response) object;
} else {
PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class);
myresponse.setBodyString(bodyString);
return (PlainBodyResponse) myresponse;
}
} catch (StatusRuntimeException e) {
if (Status.UNAVAILABLE.equals(e.getStatus())) {
LOGGER.warn("request fail,server is not avaliable now,trying to switch server ");
switchServer(false);
}
throw e;
} catch (Exception e) {
LOGGER.error("grpc client request error, error message is ", e.getMessage(), e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
@Override
public void shutdown() throws NacosException {
if (this.channel != null && !this.channel.isShutdown()) {
this.channel.shutdownNow();
}
}
private GrpcServerInfo resolveServerInfo(String serverAddress) {
GrpcServerInfo serverInfo = new GrpcServerInfo();
serverInfo.serverPort = 1000;
if (serverAddress.contains("http")) {
serverInfo.serverIp = serverAddress.split(":")[1].replaceAll("//", "");
serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", ""));
} else {
serverInfo.serverIp = serverAddress.split(":")[0];
serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[1]);
}
return serverInfo;
}
class GrpcServerInfo {
String serverIp;
int serverPort;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote.grpc;
import com.alibaba.nacos.api.remote.response.Response;
/**
* ServerPushResponseHandler.
*
* @author liuzunfei
* @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
*/
public interface ServerPushResponseHandler<T> {
/**
* Handle logic when response received.
* @param response response
*/
void responseReply(Response response);
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote.request;
import com.alibaba.nacos.api.remote.request.Request;
/**
* Cluster request.
*
* @author xiweng.yy
*/
public abstract class AbstractClusterRequest extends Request {
@Override
public String getModule() {
return "cluster";
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote.request;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
/**
* Forward heart beat request.
*
* @author xiweng.yy
*/
public class ForwardHeartBeatRequest extends AbstractClusterRequest {
private String connectionId;
public ForwardHeartBeatRequest() {
}
public ForwardHeartBeatRequest(String connectionId) {
this.connectionId = connectionId;
}
@Override
public String getType() {
return NamingRemoteConstants.FORWARD_HEART_BEAT;
}
public String getConnectionId() {
return connectionId;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster.remote.request;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
import com.alibaba.nacos.api.remote.request.RequestMeta;
/**
* Forward instance request.
*
* @author xiweng.yy
*/
public class ForwardInstanceRequest extends AbstractClusterRequest {
private InstanceRequest instanceRequest;
private RequestMeta sourceRequestMeta;
public ForwardInstanceRequest() {
}
public ForwardInstanceRequest(InstanceRequest instanceRequest, RequestMeta sourceRequestMeta) {
this.instanceRequest = instanceRequest;
this.sourceRequestMeta = sourceRequestMeta;
}
@Override
public String getType() {
return NamingRemoteConstants.FORWARD_INSTANCE;
}
public InstanceRequest getInstanceRequest() {
return instanceRequest;
}
public void setInstanceRequest(InstanceRequest instanceRequest) {
this.instanceRequest = instanceRequest;
}
public RequestMeta getSourceRequestMeta() {
return sourceRequestMeta;
}
public void setSourceRequestMeta(RequestMeta sourceRequestMeta) {
this.sourceRequestMeta = sourceRequestMeta;
}
}

View File

@ -127,6 +127,10 @@ public class GlobalExecutor {
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.nacos-server-performance"));
private static final ScheduledExecutorService REMOTE_CONNECTION_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.remote-connection-manager"));
public static void submitDataSync(Runnable runnable, long delay) {
DATA_SYNC_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
@ -241,4 +245,9 @@ public class GlobalExecutor {
public static void schedulePerformanceLogger(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
SERVER_PERFORMANCE_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
public static void scheduleRemoteConnectionManager(Runnable runnable, long initialDelay, long delay,
TimeUnit unit) {
REMOTE_CONNECTION_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
}

View File

@ -40,8 +40,11 @@ public class RemotingConnection {
private final Connection connection;
private long lastHeartBeatTime;
public RemotingConnection(Connection connection) {
this.connection = connection;
this.lastHeartBeatTime = System.currentTimeMillis();
}
public String getConnectionId() {
@ -115,4 +118,12 @@ public class RemotingConnection {
public ConcurrentMap<String, Set<Instance>> getInstanceIndex() {
return instanceIndex;
}
public long getLastHeartBeatTime() {
return lastHeartBeatTime;
}
public void setLastHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}
}

View File

@ -16,19 +16,29 @@
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.core.remote.event.RemotingHeartBeatEvent;
import com.alibaba.nacos.naming.cluster.remote.ClusterClient;
import com.alibaba.nacos.naming.cluster.remote.ClusterClientManager;
import com.alibaba.nacos.naming.cluster.remote.request.ForwardHeartBeatRequest;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.push.RemotePushService;
import com.alibaba.nacos.naming.remote.task.RenewInstanceBeatTask;
import com.alibaba.nacos.naming.remote.worker.RemotingWorkersManager;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Remoting connection holder.
@ -44,35 +54,112 @@ public class RemotingConnectionHolder extends ClientConnectionEventListener {
private final ServiceManager serviceManager;
public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager) {
public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager,
ClusterClientManager clusterClientManager) {
this.remotePushService = remotePushService;
this.serviceManager = serviceManager;
NotifyCenter.registerSubscriber(new RemotingHeartBeatSubscriber(this, clusterClientManager));
GlobalExecutor.scheduleRemoteConnectionManager(new RemotingConnectionCleaner(this), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
@Override
public void clientConnected(Connection connect) {
connectionCache.put(connect.getConnectionId(), new RemotingConnection(connect));
Loggers.SRV_LOG.info("Client connection {} connect", connect.getConnectionId());
if (!connectionCache.containsKey(connect.getConnectionId())) {
connectionCache.put(connect.getConnectionId(), new RemotingConnection(connect));
}
}
@Override
public void clientDisConnected(Connection connect) {
RemotingConnection remotingConnection = connectionCache.remove(connect.getConnectionId());
try {
for (String each : remotingConnection.getInstanceIndex().keySet()) {
Set<Instance> instances = remotingConnection.getInstanceIndex().get(each);
serviceManager.removeInstance(KeyBuilder.getNamespace(each), KeyBuilder.getServiceName(each), true,
instances.toArray(new Instance[instances.size()]));
}
for (String each : remotingConnection.getSubscriberIndex().keySet()) {
remotePushService.removeAllSubscribeForService(each);
}
} catch (NacosException e) {
Loggers.SRV_LOG
.error(String.format("Remove context of connection %s failed", connect.getConnectionId()), e);
clientDisConnected(connect.getConnectionId());
}
private void clientDisConnected(String connectionId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", connectionId);
RemotingConnection remotingConnection = connectionCache.remove(connectionId);
if (null == remotingConnection) {
return;
}
for (String each : remotingConnection.getSubscriberIndex().keySet()) {
remotePushService.removeAllSubscribeForService(each);
}
}
public RemotingConnection getRemotingConnection(String connectionId) {
return connectionCache.get(connectionId);
}
public Collection<String> getAllConnectionId() {
return connectionCache.keySet();
}
/**
* Renew remoting connection.
*
* @param connectionId connection id
*/
public void renewRemotingConnection(String connectionId) {
if (!connectionCache.containsKey(connectionId)) {
return;
}
RemotingConnection remotingConnection = connectionCache.get(connectionId);
remotingConnection.setLastHeartBeatTime(System.currentTimeMillis());
RemotingWorkersManager.dispatch(connectionId, new RenewInstanceBeatTask(remotingConnection, serviceManager));
}
private static class RemotingHeartBeatSubscriber extends Subscriber<RemotingHeartBeatEvent> {
private final RemotingConnectionHolder remotingConnectionHolder;
private final ClusterClientManager clusterClientManager;
public RemotingHeartBeatSubscriber(RemotingConnectionHolder remotingConnectionHolder,
ClusterClientManager clusterClientManager) {
this.remotingConnectionHolder = remotingConnectionHolder;
this.clusterClientManager = clusterClientManager;
}
@Override
public void onEvent(RemotingHeartBeatEvent event) {
remotingConnectionHolder.renewRemotingConnection(event.getConnectionId());
for (ClusterClient each : clusterClientManager.getAllClusterClient()) {
try {
each.request(new ForwardHeartBeatRequest(event.getConnectionId()));
} catch (NacosException nacosException) {
Loggers.DISTRO.warn("Forward heart beat failed.", nacosException);
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return RemotingHeartBeatEvent.class;
}
}
private static class RemotingConnectionCleaner implements Runnable {
private final RemotingConnectionHolder remotingConnectionHolder;
public RemotingConnectionCleaner(RemotingConnectionHolder remotingConnectionHolder) {
this.remotingConnectionHolder = remotingConnectionHolder;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : remotingConnectionHolder.getAllConnectionId()) {
RemotingConnection remotingConnection = remotingConnectionHolder.getRemotingConnection(each);
if (null != remotingConnection && isExpireConnection(currentTime, remotingConnection)) {
remotingConnectionHolder.clientDisConnected(each);
}
}
}
private boolean isExpireConnection(long currentTime, RemotingConnection remotingConnection) {
return remotingConnection.getLastHeartBeatTime() - currentTime > Constants.DEFAULT_IP_DELETE_TIMEOUT * 2;
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.handler;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.HeartBeatResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.cluster.remote.request.ForwardHeartBeatRequest;
import com.alibaba.nacos.naming.remote.RemotingConnectionHolder;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Forward heart beat request handler.
*
* @author xiweng.yy
*/
@Component
public class ForwardHeartBeatRequestHandler extends RequestHandler<ForwardHeartBeatRequest> {
private final RemotingConnectionHolder remotingConnectionHolder;
public ForwardHeartBeatRequestHandler(RemotingConnectionHolder remotingConnectionHolder) {
this.remotingConnectionHolder = remotingConnectionHolder;
}
@Override
public ForwardHeartBeatRequest parseBodyString(String bodyString) {
return JacksonUtils.toObj(bodyString, ForwardHeartBeatRequest.class);
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
remotingConnectionHolder.renewRemotingConnection(((ForwardHeartBeatRequest) request).getConnectionId());
return new HeartBeatResponse();
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(NamingRemoteConstants.FORWARD_HEART_BEAT);
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.handler;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.connection.ConnectionMetaInfo;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.cluster.remote.ClusterConnection;
import com.alibaba.nacos.naming.cluster.remote.request.ForwardInstanceRequest;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.remote.RemotingConnectionHolder;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Forward instance request handler.
*
* @author xiweng.yy
*/
@Component
public class ForwardInstanceRequestHandler extends RequestHandler<ForwardInstanceRequest> {
private final InstanceRequestHandler instanceRequestHandler;
private final RemotingConnectionHolder remotingConnectionHolder;
private final DistroMapper distroMapper;
public ForwardInstanceRequestHandler(InstanceRequestHandler instanceRequestHandler,
RemotingConnectionHolder remotingConnectionHolder, DistroMapper distroMapper) {
this.instanceRequestHandler = instanceRequestHandler;
this.remotingConnectionHolder = remotingConnectionHolder;
this.distroMapper = distroMapper;
}
@Override
public ForwardInstanceRequest parseBodyString(String bodyString) {
return JacksonUtils.toObj(bodyString, ForwardInstanceRequest.class);
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
ForwardInstanceRequest actualRequest = (ForwardInstanceRequest) request;
InstanceRequest instanceRequest = actualRequest.getInstanceRequest();
String serviceName = NamingUtils
.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName());
if (distroMapper.responsible(serviceName)) {
RequestMeta sourceRequestMeta = actualRequest.getSourceRequestMeta();
addRemotingConnectionIfAbsent(sourceRequestMeta);
return instanceRequestHandler.handle(instanceRequest, sourceRequestMeta);
}
throw new NacosException(NacosException.BAD_GATEWAY,
String.format("Forward instance request to error server, service: %s", serviceName));
}
private void addRemotingConnectionIfAbsent(RequestMeta sourceRequestMeta) {
if (null == remotingConnectionHolder.getRemotingConnection(sourceRequestMeta.getConnectionId())) {
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(sourceRequestMeta.getConnectionId(),
sourceRequestMeta.getClientIp(), "cluster", sourceRequestMeta.getClientVersion());
remotingConnectionHolder.clientConnected(new ClusterConnection(metaInfo));
}
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(NamingRemoteConstants.FORWARD_INSTANCE);
}
}

View File

@ -27,6 +27,9 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.cluster.remote.ClusterClientManager;
import com.alibaba.nacos.naming.cluster.remote.request.ForwardInstanceRequest;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers;
@ -49,9 +52,16 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
private final RemotingConnectionHolder remotingConnectionHolder;
public InstanceRequestHandler(ServiceManager serviceManager, RemotingConnectionHolder remotingConnectionHolder) {
private final ClusterClientManager clusterClientManager;
private final DistroMapper distroMapper;
public InstanceRequestHandler(ServiceManager serviceManager, RemotingConnectionHolder remotingConnectionHolder,
ClusterClientManager clusterClientManager, DistroMapper distroMapper) {
this.serviceManager = serviceManager;
this.remotingConnectionHolder = remotingConnectionHolder;
this.clusterClientManager = clusterClientManager;
this.distroMapper = distroMapper;
}
@Override
@ -62,20 +72,40 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
InstanceRequest instanceRequest = (InstanceRequest) request;
String namespace = instanceRequest.getNamespace();
String serviceName = NamingUtils
.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName());
switch (instanceRequest.getType()) {
if (distroMapper.responsible(serviceName)) {
return handleResponsibleRequest(serviceName, instanceRequest, meta);
} else {
return forwardRequestToResponsibleServer(serviceName, instanceRequest, meta);
}
}
private Response handleResponsibleRequest(String serviceName, InstanceRequest request, RequestMeta meta)
throws NacosException {
String namespace = request.getNamespace();
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(namespace, serviceName, instanceRequest, meta);
return registerInstance(namespace, serviceName, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(namespace, serviceName, instanceRequest, meta);
return deregisterInstance(namespace, serviceName, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", instanceRequest.getType()));
String.format("Unsupported request type %s", request.getType()));
}
}
private Response forwardRequestToResponsibleServer(String serviceName, InstanceRequest request, RequestMeta meta)
throws NacosException {
String targetAddress = distroMapper.mapSrv(serviceName);
if (clusterClientManager.hasClientForMember(targetAddress)) {
return clusterClientManager.getClusterClient(targetAddress)
.request(new ForwardInstanceRequest(request, meta));
}
throw new NacosException(NacosException.BAD_GATEWAY,
String.format("Can't find responsible server for service %s", serviceName));
}
private Response registerInstance(String namespace, String serviceName, InstanceRequest instanceRequest,
RequestMeta meta) throws NacosException {
if (!serviceManager.containService(namespace, serviceName)) {
@ -86,8 +116,6 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
instance.setInstanceId(instance.generateInstanceId());
instance.setLastBeat(System.currentTimeMillis());
// Register instance by connection, do not need keep alive by beat.
instance.setMarked(true);
instance.validate();
serviceManager.addInstance(namespace, serviceName, instance.isEphemeral(), instance);
remotingConnectionHolder.getRemotingConnection(meta.getConnectionId())
.addNewInstance(namespace, serviceName, instance);

View File

@ -0,0 +1,56 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.task;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.remote.RemotingConnection;
import java.util.Set;
/**
* Renew instance beat task.
*
* @author xiweng.yy
*/
public class RenewInstanceBeatTask implements Runnable {
private final RemotingConnection remotingConnection;
private final ServiceManager serviceManager;
public RenewInstanceBeatTask(RemotingConnection remotingConnection, ServiceManager serviceManager) {
this.remotingConnection = remotingConnection;
this.serviceManager = serviceManager;
}
@Override
public void run() {
for (String each : remotingConnection.getInstanceIndex().keySet()) {
Set<Instance> instances = remotingConnection.getInstanceIndex().get(each);
Service service = serviceManager.getService(KeyBuilder.getNamespace(each), KeyBuilder.getServiceName(each));
for (Instance actual : service.allIPs(true)) {
if (instances.contains(actual)) {
actual.setHealthy(true);
actual.setLastBeat(System.currentTimeMillis());
}
}
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.worker;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Remoting worker.
*
* @author xiweng.yy
*/
public final class RemotingWorker implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorker.class);
private static final String SEPARATOR = "_";
private static final int QUEUE_CAPACITY = 50000;
private final BlockingQueue<Runnable> queue;
private final String name;
private final InnerWorker worker;
public RemotingWorker(final int mod, final int total) {
name = getClass().getName() + "_" + mod + "%" + total;
queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
worker = new InnerWorker(name);
worker.start();
}
public String getName() {
return name;
}
/**
* Execute task.
*/
public void execute(Runnable task) {
putTask(task);
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
LOGGER.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
@Override
public void shutdown() {
worker.shutdown();
queue.clear();
}
/**
* Real worker thread.
*/
private class InnerWorker extends Thread implements Closeable {
private volatile boolean start = true;
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (start) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
LOGGER.warn("it takes {}ms to run task {}", duration, task);
}
} catch (Throwable e) {
LOGGER.error("[remoting-worker-error] " + e.toString(), e);
}
}
}
@Override
public void shutdown() {
start = false;
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.worker;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Remoting workers manager.
*
* @author xiweng.yy
*/
public final class RemotingWorkersManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorkersManager.class);
private static final int TIMES_FOR_CORE = 2;
/**
* power of 2.
*/
private static final RemotingWorker[] REMOTING_WORKERS;
private RemotingWorkersManager() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
shutdown();
} catch (NacosException nacosException) {
LOGGER.warn("shutdown RemotingWorkersManager failed", nacosException);
}
}));
}
static {
// Find a power of 2 >= cpuCores * 2.
final int coreCount = Runtime.getRuntime().availableProcessors();
int workerCount = 1;
while (workerCount < coreCount * TIMES_FOR_CORE) {
workerCount <<= 1;
}
REMOTING_WORKERS = new RemotingWorker[workerCount];
for (int mod = 0; mod < workerCount; ++mod) {
REMOTING_WORKERS[mod] = new RemotingWorker(mod, workerCount);
}
}
/**
* Dispatch task by connectionId.
*/
public static void dispatch(String connectionId, Runnable task) {
RemotingWorker worker = getWorker(connectionId);
worker.execute(task);
}
/**
* Get worker of connection id.
*
* @param connectionId connection Id
* @return remoting worker
*/
private static RemotingWorker getWorker(String connectionId) {
int idx = connectionId.hashCode() & (REMOTING_WORKERS.length - 1);
return REMOTING_WORKERS[idx];
}
public static int workersCount() {
return REMOTING_WORKERS.length;
}
@Override
public void shutdown() throws NacosException {
for (RemotingWorker each : REMOTING_WORKERS) {
each.shutdown();
}
}
}

View File

@ -277,6 +277,7 @@
<exclude>**/istio/model/mcp/*.java</exclude>
<exclude>**/istio/model/naming/*.java</exclude>
<exclude>**/istio/model/*.java</exclude>
<exclude>**/api/grpc/*.java</exclude>
</excludes>
</configuration>
<executions>
@ -304,7 +305,7 @@
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<excludes>**/istio/model/**,**/nacos/test/**</excludes>
<excludes>**/istio/model/**,**/nacos/test/**,**/api/grpc/**</excludes>
</configuration>
<executions>
<execution>