diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index 5ef0f1630..aeff42475 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -25,6 +25,7 @@ import com.alibaba.nacos.common.http.HttpClientConfig; import com.alibaba.nacos.common.http.HttpRestResult; import com.alibaba.nacos.common.http.client.NacosRestTemplate; import com.alibaba.nacos.common.http.param.Header; +import com.alibaba.nacos.common.http.param.Query; import com.alibaba.nacos.common.utils.ExceptionUtil; import org.slf4j.Logger; diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/http/NamingHttpClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/http/NamingHttpClientProxy.java index c08e48b22..a6767f0e5 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/http/NamingHttpClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/http/NamingHttpClientProxy.java @@ -48,6 +48,7 @@ import com.alibaba.nacos.client.utils.TemplateUtils; import com.alibaba.nacos.common.http.HttpRestResult; import com.alibaba.nacos.common.http.client.NacosRestTemplate; import com.alibaba.nacos.common.http.param.Header; +import com.alibaba.nacos.common.http.param.Query; import com.alibaba.nacos.common.utils.HttpMethod; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.StringUtils; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index f11e3089c..4f48c263b 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -101,7 +101,7 @@ public class AsyncNotifyService { } } if (!httpQueue.isEmpty()) { - ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); + ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java deleted file mode 100644 index 60033694b..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClient.java +++ /dev/null @@ -1,369 +0,0 @@ -/* - * Copyright 1999-2020 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.cluster.remote.grpc; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.grpc.auto.Metadata; -import com.alibaba.nacos.api.grpc.auto.Payload; -import com.alibaba.nacos.api.grpc.auto.RequestGrpc; -import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc; -import com.alibaba.nacos.api.remote.PayloadRegistry; -import com.alibaba.nacos.api.remote.request.HeartBeatRequest; -import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.request.ServerCheckRequest; -import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse; -import com.alibaba.nacos.api.remote.response.PlainBodyResponse; -import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.common.utils.JacksonUtils; -import com.alibaba.nacos.naming.cluster.remote.RpcClient; -import com.alibaba.nacos.naming.cluster.remote.RpcClientStatus; -import com.alibaba.nacos.naming.misc.NetUtils; -import com.alibaba.nacos.naming.misc.UtilsAndCommons; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * gRPC Client. - * - * @author liuzunfei - * @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $ - */ -public class GrpcClient extends RpcClient { - - private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class); - - /** - * change listeners handler registry. - */ - protected List serverPushResponseListeners = new ArrayList(); - - protected ManagedChannel channel; - - protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub; - - protected RequestGrpc.RequestBlockingStub grpcServiceStub; - - public GrpcClient(String target) { - super(target); - } - - /** - * create a new channel . - * - * @param serverIp serverIp. - * @param serverPort serverPort. - * @return if server check success,return stub. - */ - private RequestGrpc.RequestBlockingStub createNewChannelStub(String serverIp, int serverPort) { - - ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext() - .build(); - - RequestGrpc.RequestBlockingStub grpcServiceStubTemp = RequestGrpc.newBlockingStub(managedChannelTemp); - boolean checkSuccess = serverCheck(grpcServiceStubTemp); - LOGGER.info(String.format("create cluster channel to %s:%d result %s", serverIp, serverPort, checkSuccess)); - - if (checkSuccess) { - return grpcServiceStubTemp; - } else { - shuntDownChannel(managedChannelTemp); - return null; - } - - } - - /** - * shutdown a channel. - * - * @param managedChannel channel to be shutdown. - */ - private void shuntDownChannel(ManagedChannel managedChannel) { - if (managedChannel != null && !managedChannel.isShutdown()) { - managedChannel.shutdownNow(); - } - } - - private void connectToServer() { - rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING); - GrpcServerInfo serverInfo = resolveServerInfo(target); - RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp, - serverInfo.serverPort); - if (newChannelStubTemp != null) { - RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc - .newStub(newChannelStubTemp.getChannel()); - bindRequestStream(requestStreamStubTemp); - //switch current channel and stub - channel = (ManagedChannel) newChannelStubTemp.getChannel(); - grpcStreamServiceStub = requestStreamStubTemp; - grpcServiceStub = newChannelStubTemp; - rpcClientStatus.set(RpcClientStatus.RUNNING); - } else { - switchServer(true); - } - } - - @Override - public void start() throws NacosException { - - if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) { - LOGGER.error("RpcClient has not init yet, please check init ServerListFactory..."); - throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet"); - } - if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) { - return; - } - - connectToServer(); - - executorService.scheduleWithFixedDelay(() -> sendBeat(), 0, 3000, TimeUnit.MILLISECONDS); - } - - /** - * switch a new server. - */ - private void switchServer(final boolean onStarting) { - - if (onStarting) { - // access on startup fail - rpcClientStatus.set(RpcClientStatus.SWITCHING_SERVER); - - } else { - // access from running status, sendbeat fail or receive reset message from server. - boolean changeStatusSuccess = rpcClientStatus - .compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.SWITCHING_SERVER); - if (!changeStatusSuccess) { - return; - } - } - - executorService.schedule(() -> { - // loop until start client success. - while (!isRunning()) { - - //1.get a new server - GrpcServerInfo serverInfo = resolveServerInfo(target); - - //2.get a new channel to new server - RequestGrpc.RequestBlockingStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp, - serverInfo.serverPort); - if (newChannelStubTemp != null) { - RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc - .newStub(newChannelStubTemp.getChannel()); - bindRequestStream(requestStreamStubTemp); - final ManagedChannel depratedChannel = channel; - //switch current channel and stub - channel = (ManagedChannel) newChannelStubTemp.getChannel(); - grpcStreamServiceStub = requestStreamStubTemp; - grpcServiceStub = newChannelStubTemp; - rpcClientStatus.getAndSet(RpcClientStatus.RUNNING); - shuntDownChannel(depratedChannel); - continue; - } - try { - //sleep 3 second to switch next server. - Thread.sleep(3000L); - } catch (InterruptedException e) { - // Do nothing. - } - } - }, 0L, TimeUnit.MILLISECONDS); - - } - - /** - * Send Heart Beat Request. - */ - public void sendBeat() { - try { - - if (!isRunning()) { - return; - } - HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); - Response response = request(heartBeatRequest); - if (response instanceof ConnectionUnregisterResponse) { - LOGGER.warn("Send heart beat fail,connection is not registerd,trying to switch server "); - switchServer(false); - } - } catch (StatusRuntimeException e) { - if (Status.UNAVAILABLE.getCode().equals(e.getStatus().getCode())) { - LOGGER.warn("Send heart beat fail,server is not avaliable now,trying to switch server "); - switchServer(false); - return; - } - throw e; - } catch (Exception e) { - LOGGER.error("Send heart beat error, ", e); - } - } - - private Metadata.Builder buildMeta() { - return Metadata.newBuilder().setClientIp(NetUtils.localServer()) - .setClientVersion(UtilsAndCommons.SERVER_VERSION); - - } - - /** - * chenck server if ok. - * - * @param requestBlockingStub requestBlockingStub used to check server. - * @return - */ - private boolean serverCheck(RequestGrpc.RequestBlockingStub requestBlockingStub) { - try { - ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); - - Metadata meta = Metadata.newBuilder().setClientIp(NetUtils.localServer()) - .setClientVersion(UtilsAndCommons.SERVER_VERSION).setType(ServerCheckRequest.class.getName()) - .build(); - - Payload streamRequest = Payload.newBuilder().setMetadata(meta).setBody( - Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(serverCheckRequest))) - .build()).build(); - Payload response = requestBlockingStub.request(streamRequest); - return response != null; - } catch (Exception e) { - return false; - } - } - - /** - * bind request stream observer (send a connection). - * - * @param streamStub streamStub to bind. - */ - private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) { - Payload streamRequest = Payload.newBuilder().setMetadata(buildMeta()).build(); - LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest); - streamStub.requestStream(streamRequest, new StreamObserver() { - @Override - public void onNext(Payload grpcResponse) { - - LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse); - try { - - String message = grpcResponse.getBody().getValue().toStringUtf8(); - String type = grpcResponse.getMetadata().getType(); - String bodyString = grpcResponse.getBody().getValue().toStringUtf8(); - Class classByType = PayloadRegistry.getClassbyType(type); - final Response response; - if (classByType != null) { - response = (Response) JacksonUtils.toObj(bodyString, classByType); - } else { - PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class); - myresponse.setBodyString(bodyString); - response = myresponse; - } - serverPushResponseListeners - .forEach(serverPushResponseHandler -> serverPushResponseHandler.responseReply(response)); - } catch (Exception e) { - LOGGER.error("error tp process server push response :{}", grpcResponse); - } - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onCompleted() { - } - }); - } - - @Override - public Response request(Request request) throws NacosException { - - if (!this.isRunning()) { - throw new IllegalStateException("Client is not connected to any server now,please retry later"); - } - try { - - Payload grpcrequest = Payload.newBuilder().setMetadata(buildMeta().setType(request.getClass().getName())) - .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build(); - Payload response = grpcServiceStub.request(grpcrequest); - String type = response.getMetadata().getType(); - String bodyString = response.getBody().getValue().toStringUtf8(); - - // transfrom grpcResponse to response model - Class classByType = PayloadRegistry.getClassbyType(type); - if (classByType != null) { - Object object = JacksonUtils.toObj(bodyString, classByType); - if (object instanceof ConnectionUnregisterResponse) { - switchServer(false); - throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "connection is not connected."); - } - return (Response) object; - } else { - PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class); - myresponse.setBodyString(bodyString); - return (PlainBodyResponse) myresponse; - } - } catch (StatusRuntimeException e) { - if (Status.UNAVAILABLE.equals(e.getStatus())) { - LOGGER.warn("request fail,server is not avaliable now,trying to switch server "); - switchServer(false); - } - throw e; - } catch (Exception e) { - LOGGER.error("grpc client request error, error message is ", e.getMessage(), e); - throw new NacosException(NacosException.SERVER_ERROR, e); - } - } - - @Override - public void shutdown() throws NacosException { - if (this.channel != null && !this.channel.isShutdown()) { - this.channel.shutdownNow(); - } - } - - private GrpcServerInfo resolveServerInfo(String serverAddress) { - GrpcServerInfo serverInfo = new GrpcServerInfo(); - serverInfo.serverPort = 1000; - if (serverAddress.contains("http")) { - serverInfo.serverIp = serverAddress.split(":")[1].replaceAll("//", ""); - serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", "")); - } else { - serverInfo.serverIp = serverAddress.split(":")[0]; - serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[1]); - } - return serverInfo; - } - - class GrpcServerInfo { - - String serverIp; - - int serverPort; - - } -} - - - diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java index 6757fc0f4..bd9d38af4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/GrpcClusterClient.java @@ -19,6 +19,7 @@ package com.alibaba.nacos.naming.cluster.remote.grpc; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.common.remote.client.grpc.GrpcClient; import com.alibaba.nacos.naming.cluster.remote.ClusterClient; @@ -32,7 +33,7 @@ public class GrpcClusterClient implements ClusterClient { private final GrpcClient grpcClient; public GrpcClusterClient(String targetAddress) { - this.grpcClient = new GrpcClient(new SingleServerListFactory(targetAddress)); + this.grpcClient = new GrpcClient(targetAddress); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/SingleServerListFactory.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/SingleServerListFactory.java index c192301e2..231475dcd 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/SingleServerListFactory.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/grpc/SingleServerListFactory.java @@ -18,6 +18,8 @@ package com.alibaba.nacos.naming.cluster.remote.grpc; import com.alibaba.nacos.common.remote.client.ServerListFactory; +import java.util.List; + /** * Single server list factory. * @@ -40,4 +42,9 @@ public class SingleServerListFactory implements ServerListFactory { public String getCurrentServer() { return address; } + + @Override + public List getServerList() { + return null; + } }