From 56348af81b8022ef2c09296c1224e9f50174e650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E5=BB=B6?= <1060026287@qq.com> Date: Tue, 29 Dec 2020 13:54:21 +0800 Subject: [PATCH] make code more readable. (#4568) * make code more readable. * code enhance * code enhance * fix comment for grpc client * initHandlerRpcClient -> initRpcClientHandler * rename some method and modify some comments * code enhance. * code enhance. * rename NamingPushResponseHandler -> NamingPushRequestHandler * fix connectionId typo --- .../api/grpc/auto/BiRequestStreamGrpc.java | 16 +++---- .../nacos/api/grpc/auto/RequestGrpc.java | 21 ++++----- .../api/grpc/auto/RequestStreamGrpc.java | 47 ++++++++++--------- .../client/config/impl/ClientWorker.java | 6 +-- .../remote/gprc/NamingGrpcClientProxy.java | 2 +- ...ler.java => NamingPushRequestHandler.java} | 6 +-- .../nacos/common/remote/client/RpcClient.java | 10 ++-- .../remote/client/ServerRequestHandler.java | 4 +- .../remote/client/grpc/GrpcClusterClient.java | 4 +- .../remote/client/grpc/GrpcConnection.java | 2 +- .../remote/client/grpc/GrpcOpsClient.java | 4 +- .../remote/client/grpc/GrpcSdkClient.java | 2 +- .../common/remote/client/grpc/GrpcUtils.java | 23 ++++----- .../remote/RpcAckCallbackSynchronizer.java | 22 ++++----- .../core/remote/grpc/BaseGrpcServer.java | 12 ++--- 15 files changed, 92 insertions(+), 89 deletions(-) rename client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/{NamingPushResponseHandler.java => NamingPushRequestHandler.java} (90%) diff --git a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/BiRequestStreamGrpc.java b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/BiRequestStreamGrpc.java index b074704a3..945a7ab43 100644 --- a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/BiRequestStreamGrpc.java +++ b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/BiRequestStreamGrpc.java @@ -16,10 +16,9 @@ package com.alibaba.nacos.api.grpc.auto; -import static io.grpc.MethodDescriptor.generateFullMethodName; -import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall; -import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall; -import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ServerCalls; /** */ @@ -43,7 +42,8 @@ public final class BiRequestStreamGrpc { if ((getRequestBiStreamMethod = BiRequestStreamGrpc.getRequestBiStreamMethod) == null) { BiRequestStreamGrpc.getRequestBiStreamMethod = getRequestBiStreamMethod = io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING) - .setFullMethodName(generateFullMethodName("BiRequestStream", "requestBiStream")) + .setFullMethodName( + MethodDescriptor.generateFullMethodName("BiRequestStream", "requestBiStream")) .setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils .marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils @@ -88,13 +88,13 @@ public final class BiRequestStreamGrpc { */ public io.grpc.stub.StreamObserver requestBiStream( io.grpc.stub.StreamObserver responseObserver) { - return asyncUnimplementedStreamingCall(getRequestBiStreamMethod(), responseObserver); + return ServerCalls.asyncUnimplementedStreamingCall(getRequestBiStreamMethod(), responseObserver); } @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestBiStreamMethod(), - asyncBidiStreamingCall( + ServerCalls.asyncBidiStreamingCall( new MethodHandlers( this, METHODID_REQUEST_BI_STREAM))).build(); } @@ -124,7 +124,7 @@ public final class BiRequestStreamGrpc { */ public io.grpc.stub.StreamObserver requestBiStream( io.grpc.stub.StreamObserver responseObserver) { - return asyncBidiStreamingCall(getChannel().newCall(getRequestBiStreamMethod(), getCallOptions()), + return ClientCalls.asyncBidiStreamingCall(getChannel().newCall(getRequestBiStreamMethod(), getCallOptions()), responseObserver); } } diff --git a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestGrpc.java b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestGrpc.java index fd893143e..9bb0a934a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestGrpc.java +++ b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestGrpc.java @@ -16,12 +16,9 @@ package com.alibaba.nacos.api.grpc.auto; -import static io.grpc.MethodDescriptor.generateFullMethodName; -import static io.grpc.stub.ClientCalls.asyncUnaryCall; -import static io.grpc.stub.ClientCalls.blockingUnaryCall; -import static io.grpc.stub.ClientCalls.futureUnaryCall; -import static io.grpc.stub.ServerCalls.asyncUnaryCall; -import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ServerCalls; /** */ @@ -45,7 +42,7 @@ public final class RequestGrpc { if ((getRequestMethod = RequestGrpc.getRequestMethod) == null) { RequestGrpc.getRequestMethod = getRequestMethod = io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName("Request", "request")) + .setFullMethodName(MethodDescriptor.generateFullMethodName("Request", "request")) .setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils .marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils @@ -89,13 +86,13 @@ public final class RequestGrpc { */ public void request(com.alibaba.nacos.api.grpc.auto.Payload request, io.grpc.stub.StreamObserver responseObserver) { - asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver); + ServerCalls.asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver); } @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestMethod(), - asyncUnaryCall( + ServerCalls.asyncUnaryCall( new MethodHandlers( this, METHODID_REQUEST))).build(); } @@ -125,7 +122,7 @@ public final class RequestGrpc { */ public void request(com.alibaba.nacos.api.grpc.auto.Payload request, io.grpc.stub.StreamObserver responseObserver) { - asyncUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver); + ClientCalls.asyncUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver); } } @@ -152,7 +149,7 @@ public final class RequestGrpc { * */ public com.alibaba.nacos.api.grpc.auto.Payload request(com.alibaba.nacos.api.grpc.auto.Payload request) { - return blockingUnaryCall(getChannel(), getRequestMethod(), getCallOptions(), request); + return ClientCalls.blockingUnaryCall(getChannel(), getRequestMethod(), getCallOptions(), request); } } @@ -180,7 +177,7 @@ public final class RequestGrpc { */ public com.google.common.util.concurrent.ListenableFuture request( com.alibaba.nacos.api.grpc.auto.Payload request) { - return futureUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request); + return ClientCalls.futureUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request); } } diff --git a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestStreamGrpc.java b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestStreamGrpc.java index cbf152f89..5b6c78cb7 100644 --- a/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestStreamGrpc.java +++ b/api/src/main/java/com/alibaba/nacos/api/grpc/auto/RequestStreamGrpc.java @@ -16,13 +16,12 @@ package com.alibaba.nacos.api.grpc.auto; -import static io.grpc.MethodDescriptor.generateFullMethodName; -import static io.grpc.stub.ClientCalls.asyncServerStreamingCall; -import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; -import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; -import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ServerCalls; /** + * */ @javax.annotation.Generated(value = "by gRPC proto compiler (version 1.20.0)", comments = "Source: nacos_grpc_service.proto") public final class RequestStreamGrpc { @@ -43,8 +42,8 @@ public final class RequestStreamGrpc { synchronized (RequestStreamGrpc.class) { if ((getRequestStreamMethod = RequestStreamGrpc.getRequestStreamMethod) == null) { RequestStreamGrpc.getRequestStreamMethod = getRequestStreamMethod = io.grpc.MethodDescriptor.newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) - .setFullMethodName(generateFullMethodName("RequestStream", "requestStream")) + .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING).setFullMethodName( + MethodDescriptor.generateFullMethodName("RequestStream", "requestStream")) .setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils .marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils @@ -78,6 +77,7 @@ public final class RequestStreamGrpc { } /** + * */ public static abstract class RequestStreamImplBase implements io.grpc.BindableService { @@ -88,19 +88,20 @@ public final class RequestStreamGrpc { */ public void requestStream(com.alibaba.nacos.api.grpc.auto.Payload request, io.grpc.stub.StreamObserver responseObserver) { - asyncUnimplementedUnaryCall(getRequestStreamMethod(), responseObserver); + ServerCalls.asyncUnimplementedUnaryCall(getRequestStreamMethod(), responseObserver); } - + @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestStreamMethod(), - asyncServerStreamingCall( + ServerCalls.asyncServerStreamingCall( new MethodHandlers( this, METHODID_REQUEST_STREAM))).build(); } } /** + * */ public static final class RequestStreamStub extends io.grpc.stub.AbstractStub { @@ -111,7 +112,7 @@ public final class RequestStreamGrpc { private RequestStreamStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } - + @java.lang.Override protected RequestStreamStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { return new RequestStreamStub(channel, callOptions); @@ -124,12 +125,14 @@ public final class RequestStreamGrpc { */ public void requestStream(com.alibaba.nacos.api.grpc.auto.Payload request, io.grpc.stub.StreamObserver responseObserver) { - asyncServerStreamingCall(getChannel().newCall(getRequestStreamMethod(), getCallOptions()), request, - responseObserver); + ClientCalls + .asyncServerStreamingCall(getChannel().newCall(getRequestStreamMethod(), getCallOptions()), request, + responseObserver); } } /** + * */ public static final class RequestStreamBlockingStub extends io.grpc.stub.AbstractStub { @@ -140,7 +143,7 @@ public final class RequestStreamGrpc { private RequestStreamBlockingStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } - + @java.lang.Override protected RequestStreamBlockingStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { return new RequestStreamBlockingStub(channel, callOptions); @@ -153,11 +156,13 @@ public final class RequestStreamGrpc { */ public java.util.Iterator requestStream( com.alibaba.nacos.api.grpc.auto.Payload request) { - return blockingServerStreamingCall(getChannel(), getRequestStreamMethod(), getCallOptions(), request); + return ClientCalls + .blockingServerStreamingCall(getChannel(), getRequestStreamMethod(), getCallOptions(), request); } } /** + * */ public static final class RequestStreamFutureStub extends io.grpc.stub.AbstractStub { @@ -168,7 +173,7 @@ public final class RequestStreamGrpc { private RequestStreamFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } - + @java.lang.Override protected RequestStreamFutureStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { return new RequestStreamFutureStub(channel, callOptions); @@ -188,7 +193,7 @@ public final class RequestStreamGrpc { this.serviceImpl = serviceImpl; this.methodId = methodId; } - + @java.lang.Override @java.lang.SuppressWarnings("unchecked") public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { @@ -201,7 +206,7 @@ public final class RequestStreamGrpc { throw new AssertionError(); } } - + @java.lang.Override @java.lang.SuppressWarnings("unchecked") public io.grpc.stub.StreamObserver invoke(io.grpc.stub.StreamObserver responseObserver) { @@ -217,12 +222,12 @@ public final class RequestStreamGrpc { RequestStreamBaseDescriptorSupplier() { } - + @java.lang.Override public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() { return com.alibaba.nacos.api.grpc.auto.NacosGrpcService.getDescriptor(); } - + @java.lang.Override public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() { return getFileDescriptor().findServiceByName("RequestStream"); @@ -243,7 +248,7 @@ public final class RequestStreamGrpc { RequestStreamMethodDescriptorSupplier(String methodName) { this.methodName = methodName; } - + @java.lang.Override public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() { return getServiceDescriptor().findMethodByName(methodName); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 8c96b42b2..22f438f9c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -555,11 +555,11 @@ public class ClientWorker implements Closeable { return labels; } - private void initHandlerRpcClient(final RpcClient rpcClientInner) { + private void initRpcClientHandler(final RpcClient rpcClientInner) { /* * Register Listen Change Handler */ - rpcClientInner.registerServerPushResponseHandler((request, requestMeta) -> { + rpcClientInner.registerServerRequestHandler((request, requestMeta) -> { if (request instanceof ConfigChangeNotifyRequest) { ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(), @@ -818,7 +818,7 @@ public class ClientWorker implements Closeable { RpcClient rpcClient = RpcClientFactory .createClient("config-" + taskId + "-" + uuid, getConnectionType(), newlabels); if (rpcClient.isWaitInitiated()) { - initHandlerRpcClient(rpcClient); + initRpcClientHandler(rpcClient); rpcClient.start(); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java index 26cd5b98e..347cc5b52 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java @@ -86,7 +86,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy { private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { rpcClient.init(serverListFactory); rpcClient.start(); - rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder)); + rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); rpcClient.registerConnectionListener(namingGrpcConnectionEventListener); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushResponseHandler.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java similarity index 90% rename from client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushResponseHandler.java rename to client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java index 1c3c4f0c7..b6021d2fc 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushResponseHandler.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java @@ -25,15 +25,15 @@ import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.common.remote.client.ServerRequestHandler; /** - * Naming push response handler. + * Naming push request handler. * * @author xiweng.yy */ -public class NamingPushResponseHandler implements ServerRequestHandler { +public class NamingPushRequestHandler implements ServerRequestHandler { private final ServiceInfoHolder serviceInfoHolder; - public NamingPushResponseHandler(ServiceInfoHolder serviceInfoHolder) { + public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) { this.serviceInfoHolder = serviceInfoHolder; } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 8fd1c4b59..e0ab68d88 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -83,7 +83,7 @@ public abstract class RpcClient implements Closeable { private static final long DEFAULT_TIMEOUT_MILLS = 3000L; /** - * listener called where connect status changed. + * listener called where connection's status changed. */ protected List connectionEventListeners = new ArrayList(); @@ -283,7 +283,7 @@ public abstract class RpcClient implements Closeable { switchServerAsync(); } - registerServerPushResponseHandler(new ConnectResetRequestHandler()); + registerServerRequestHandler(new ConnectResetRequestHandler()); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -700,7 +700,7 @@ public abstract class RpcClient implements Closeable { } /** - * register connection handler.will be notified when inner connect changed. + * Register connection handler. Will be notified when inner connection's state changed. * * @param connectionEventListener connectionEventListener */ @@ -712,11 +712,11 @@ public abstract class RpcClient implements Closeable { } /** - * register change listeners ,will be called when server send change notify response th current client. + * Register serverRequestHandler, the handler will handle the request from server side. * * @param serverRequestHandler serverRequestHandler */ - public synchronized void registerServerPushResponseHandler(ServerRequestHandler serverRequestHandler) { + public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) { LoggerUtils.printIfInfoEnabled(LOGGER, "Register server push request handler:{}", serverRequestHandler.getClass().getName()); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java index 63cd6c3c4..a712160a8 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java @@ -21,10 +21,10 @@ import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; /** - * ServerPushResponseHandler. + * ServerRequestHandler, to process the request from server side. * * @author liuzunfei - * @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $ + * @version $Id: ServerRequestHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $ */ public interface ServerRequestHandler { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java index 346fb913f..4b210a611 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java @@ -17,10 +17,10 @@ package com.alibaba.nacos.common.remote.client.grpc; /** - * sdk client for grpc. + * gRPC client for cluster. * * @author liuzunfei - * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ + * @version $Id: GrpcClusterClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ */ public class GrpcClusterClient extends GrpcClient { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java index 9db56c68b..8249b6501 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * grpc connection. + * gRPC connection. * * @author liuzunfei * @version $Id: GrpcConnection.java, v 0.1 2020年08月09日 1:36 PM liuzunfei Exp $ diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java index 1e9aa66cc..636ff9093 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcOpsClient.java @@ -17,10 +17,10 @@ package com.alibaba.nacos.common.remote.client.grpc; /** - * sdk client for grpc. + * gRPC client for ops. * * @author liuzunfei - * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ + * @version $Id: GrpcOpsClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ */ public class GrpcOpsClient extends GrpcClient { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java index 089ec3d45..82fe9f2fb 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.common.remote.client.grpc; /** - * sdk client for grpc. + * gRPC client for sdk. * * @author liuzunfei * @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $ diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java index b87955a52..76470d524 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java @@ -39,7 +39,7 @@ import java.io.IOException; import java.nio.charset.Charset; /** - * grpc utils, use to parse request and response. + * gRPC utils, use to parse request and response. * * @author liuzunfei * @version $Id: GrpcUtils.java, v 0.1 2020年08月09日 1:43 PM liuzunfei Exp $ @@ -94,20 +94,20 @@ public class GrpcUtils { */ public static Payload convert(Request request, RequestMeta meta) { //meta. - Payload.Builder builder = Payload.newBuilder(); + Payload.Builder payloadBuilder = Payload.newBuilder(); Metadata.Builder metaBuilder = Metadata.newBuilder(); if (meta != null) { metaBuilder.setClientIp(meta.getClientIp()).setClientPort(meta.getClientPort()) .setConnectionId(meta.getConnectionId()).putAllLabels(meta.getLabels()) - .putAllHeaders(request.getHeaders()).setClientVersion(meta.getClientVersion()) + .setClientVersion(meta.getClientVersion()).putAllHeaders(request.getHeaders()) .setType(request.getClass().getName()); } - builder.setMetadata(metaBuilder.build()); + payloadBuilder.setMetadata(metaBuilder.build()); // request body . request.clearHeaders(); String jsonString = toJson(request); - Payload payload = builder + Payload payload = payloadBuilder .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) .build(); return payload; @@ -123,14 +123,15 @@ public class GrpcUtils { */ public static Payload convert(Request request, Metadata meta) { - Metadata buildMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build(); + Metadata newMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build(); request.clearHeaders(); String jsonString = toJson(request); Payload.Builder builder = Payload.newBuilder(); + Payload payload = builder .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) - .setMetadata(buildMeta).build(); + .setMetadata(newMeta).build(); return payload; } @@ -161,15 +162,15 @@ public class GrpcUtils { */ public static PlainRequest parse(Payload payload) { PlainRequest plainRequest = new PlainRequest(); - Class classyType = PayloadRegistry.getClassByType(payload.getMetadata().getType()); - if (classyType != null) { - Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classyType); + Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType()); + if (classType != null) { + Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classType); if (obj instanceof Request) { ((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap()); } plainRequest.body = obj; } else { - throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + classyType); + throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + payload.getMetadata().getType()); } plainRequest.type = payload.getMetadata().getType(); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java index 7320d8afa..ade6d8dcf 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcAckCallbackSynchronizer.java @@ -112,35 +112,35 @@ public class RpcAckCallbackSynchronizer { /** * clear context of connectionId. * - * @param connetionId connetionId + * @param connectionId connectionId */ - public static void clearContext(String connetionId) { - CALLBACK_CONTEXT.remove(connetionId); + public static void clearContext(String connectionId) { + CALLBACK_CONTEXT.remove(connectionId); } /** * clear context of connectionId. * - * @param connetionId connetionId + * @param connectionId connectionId */ - public static Map initContextIfNecessary(String connetionId) { - if (!CALLBACK_CONTEXT.containsKey(connetionId)) { + public static Map initContextIfNecessary(String connectionId) { + if (!CALLBACK_CONTEXT.containsKey(connectionId)) { Map context = new HashMap(128); Map stringDefaultRequestFutureMap = CALLBACK_CONTEXT - .putIfAbsent(connetionId, context); + .putIfAbsent(connectionId, context); return stringDefaultRequestFutureMap == null ? context : stringDefaultRequestFutureMap; } else { - return CALLBACK_CONTEXT.get(connetionId); + return CALLBACK_CONTEXT.get(connectionId); } } /** * clear context of connectionId. * - * @param connetionId connetionId + * @param connectionId connectionId */ - public static void clearFuture(String connetionId, String requestId) { - Map stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connetionId); + public static void clearFuture(String connectionId, String requestId) { + Map stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connectionId); if (stringDefaultPushFutureMap == null || !stringDefaultPushFutureMap.containsKey(requestId)) { return; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java index 7fb89d7d5..bbb6caff3 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java @@ -174,17 +174,17 @@ public abstract class BaseGrpcServer extends BaseRpcServer { handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor)); // bi stream register. + final MethodDescriptor biStreamMethod = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor + .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)) + .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())) + .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); + final ServerCallHandler biStreamHandler = ServerCalls .asyncBidiStreamingCall((responseObserver) -> { return grpcBiStreamRequestAcceptor.requestBiStream(responseObserver); }); - final MethodDescriptor biStreamMethod = MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor - .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)) - .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())) - .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); - final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build(); handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));