make code more readable. (#4568)

* make code more readable.

* code enhance

* code enhance

* fix comment for grpc client

* initHandlerRpcClient -> initRpcClientHandler

* rename some method and modify some comments

* code enhance.

* code enhance.

* rename NamingPushResponseHandler -> NamingPushRequestHandler

* fix connectionId typo
This commit is contained in:
赵延 2020-12-29 13:54:21 +08:00 committed by GitHub
parent 5ec27bc794
commit 56348af81b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 92 additions and 89 deletions

View File

@ -16,10 +16,9 @@
package com.alibaba.nacos.api.grpc.auto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
/**
*/
@ -43,7 +42,8 @@ public final class BiRequestStreamGrpc {
if ((getRequestBiStreamMethod = BiRequestStreamGrpc.getRequestBiStreamMethod) == null) {
BiRequestStreamGrpc.getRequestBiStreamMethod = getRequestBiStreamMethod = io.grpc.MethodDescriptor.<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName("BiRequestStream", "requestBiStream"))
.setFullMethodName(
MethodDescriptor.generateFullMethodName("BiRequestStream", "requestBiStream"))
.setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils
.marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils
@ -88,13 +88,13 @@ public final class BiRequestStreamGrpc {
*/
public io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> requestBiStream(
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
return asyncUnimplementedStreamingCall(getRequestBiStreamMethod(), responseObserver);
return ServerCalls.asyncUnimplementedStreamingCall(getRequestBiStreamMethod(), responseObserver);
}
@java.lang.Override
public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestBiStreamMethod(),
asyncBidiStreamingCall(
ServerCalls.asyncBidiStreamingCall(
new MethodHandlers<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>(
this, METHODID_REQUEST_BI_STREAM))).build();
}
@ -124,7 +124,7 @@ public final class BiRequestStreamGrpc {
*/
public io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> requestBiStream(
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
return asyncBidiStreamingCall(getChannel().newCall(getRequestBiStreamMethod(), getCallOptions()),
return ClientCalls.asyncBidiStreamingCall(getChannel().newCall(getRequestBiStreamMethod(), getCallOptions()),
responseObserver);
}
}

View File

@ -16,12 +16,9 @@
package com.alibaba.nacos.api.grpc.auto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.futureUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
/**
*/
@ -45,7 +42,7 @@ public final class RequestGrpc {
if ((getRequestMethod = RequestGrpc.getRequestMethod) == null) {
RequestGrpc.getRequestMethod = getRequestMethod = io.grpc.MethodDescriptor.<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName("Request", "request"))
.setFullMethodName(MethodDescriptor.generateFullMethodName("Request", "request"))
.setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils
.marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils
@ -89,13 +86,13 @@ public final class RequestGrpc {
*/
public void request(com.alibaba.nacos.api.grpc.auto.Payload request,
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver);
ServerCalls.asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver);
}
@java.lang.Override
public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestMethod(),
asyncUnaryCall(
ServerCalls.asyncUnaryCall(
new MethodHandlers<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>(
this, METHODID_REQUEST))).build();
}
@ -125,7 +122,7 @@ public final class RequestGrpc {
*/
public void request(com.alibaba.nacos.api.grpc.auto.Payload request,
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
asyncUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver);
ClientCalls.asyncUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver);
}
}
@ -152,7 +149,7 @@ public final class RequestGrpc {
* </pre>
*/
public com.alibaba.nacos.api.grpc.auto.Payload request(com.alibaba.nacos.api.grpc.auto.Payload request) {
return blockingUnaryCall(getChannel(), getRequestMethod(), getCallOptions(), request);
return ClientCalls.blockingUnaryCall(getChannel(), getRequestMethod(), getCallOptions(), request);
}
}
@ -180,7 +177,7 @@ public final class RequestGrpc {
*/
public com.google.common.util.concurrent.ListenableFuture<com.alibaba.nacos.api.grpc.auto.Payload> request(
com.alibaba.nacos.api.grpc.auto.Payload request) {
return futureUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request);
return ClientCalls.futureUnaryCall(getChannel().newCall(getRequestMethod(), getCallOptions()), request);
}
}

View File

@ -16,13 +16,12 @@
package com.alibaba.nacos.api.grpc.auto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
/**
*
*/
@javax.annotation.Generated(value = "by gRPC proto compiler (version 1.20.0)", comments = "Source: nacos_grpc_service.proto")
public final class RequestStreamGrpc {
@ -43,8 +42,8 @@ public final class RequestStreamGrpc {
synchronized (RequestStreamGrpc.class) {
if ((getRequestStreamMethod = RequestStreamGrpc.getRequestStreamMethod) == null) {
RequestStreamGrpc.getRequestStreamMethod = getRequestStreamMethod = io.grpc.MethodDescriptor.<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName("RequestStream", "requestStream"))
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING).setFullMethodName(
MethodDescriptor.generateFullMethodName("RequestStream", "requestStream"))
.setSampledToLocalTracing(true).setRequestMarshaller(io.grpc.protobuf.ProtoUtils
.marshaller(com.alibaba.nacos.api.grpc.auto.Payload.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils
@ -78,6 +77,7 @@ public final class RequestStreamGrpc {
}
/**
*
*/
public static abstract class RequestStreamImplBase implements io.grpc.BindableService {
@ -88,19 +88,20 @@ public final class RequestStreamGrpc {
*/
public void requestStream(com.alibaba.nacos.api.grpc.auto.Payload request,
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
asyncUnimplementedUnaryCall(getRequestStreamMethod(), responseObserver);
ServerCalls.asyncUnimplementedUnaryCall(getRequestStreamMethod(), responseObserver);
}
@java.lang.Override
public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()).addMethod(getRequestStreamMethod(),
asyncServerStreamingCall(
ServerCalls.asyncServerStreamingCall(
new MethodHandlers<com.alibaba.nacos.api.grpc.auto.Payload, com.alibaba.nacos.api.grpc.auto.Payload>(
this, METHODID_REQUEST_STREAM))).build();
}
}
/**
*
*/
public static final class RequestStreamStub extends io.grpc.stub.AbstractStub<RequestStreamStub> {
@ -111,7 +112,7 @@ public final class RequestStreamGrpc {
private RequestStreamStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestStreamStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestStreamStub(channel, callOptions);
@ -124,12 +125,14 @@ public final class RequestStreamGrpc {
*/
public void requestStream(com.alibaba.nacos.api.grpc.auto.Payload request,
io.grpc.stub.StreamObserver<com.alibaba.nacos.api.grpc.auto.Payload> responseObserver) {
asyncServerStreamingCall(getChannel().newCall(getRequestStreamMethod(), getCallOptions()), request,
responseObserver);
ClientCalls
.asyncServerStreamingCall(getChannel().newCall(getRequestStreamMethod(), getCallOptions()), request,
responseObserver);
}
}
/**
*
*/
public static final class RequestStreamBlockingStub extends io.grpc.stub.AbstractStub<RequestStreamBlockingStub> {
@ -140,7 +143,7 @@ public final class RequestStreamGrpc {
private RequestStreamBlockingStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestStreamBlockingStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestStreamBlockingStub(channel, callOptions);
@ -153,11 +156,13 @@ public final class RequestStreamGrpc {
*/
public java.util.Iterator<com.alibaba.nacos.api.grpc.auto.Payload> requestStream(
com.alibaba.nacos.api.grpc.auto.Payload request) {
return blockingServerStreamingCall(getChannel(), getRequestStreamMethod(), getCallOptions(), request);
return ClientCalls
.blockingServerStreamingCall(getChannel(), getRequestStreamMethod(), getCallOptions(), request);
}
}
/**
*
*/
public static final class RequestStreamFutureStub extends io.grpc.stub.AbstractStub<RequestStreamFutureStub> {
@ -168,7 +173,7 @@ public final class RequestStreamGrpc {
private RequestStreamFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestStreamFutureStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestStreamFutureStub(channel, callOptions);
@ -188,7 +193,7 @@ public final class RequestStreamGrpc {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
@ -201,7 +206,7 @@ public final class RequestStreamGrpc {
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(io.grpc.stub.StreamObserver<Resp> responseObserver) {
@ -217,12 +222,12 @@ public final class RequestStreamGrpc {
RequestStreamBaseDescriptorSupplier() {
}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return com.alibaba.nacos.api.grpc.auto.NacosGrpcService.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("RequestStream");
@ -243,7 +248,7 @@ public final class RequestStreamGrpc {
RequestStreamMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);

View File

@ -555,11 +555,11 @@ public class ClientWorker implements Closeable {
return labels;
}
private void initHandlerRpcClient(final RpcClient rpcClientInner) {
private void initRpcClientHandler(final RpcClient rpcClientInner) {
/*
* Register Listen Change Handler
*/
rpcClientInner.registerServerPushResponseHandler((request, requestMeta) -> {
rpcClientInner.registerServerRequestHandler((request, requestMeta) -> {
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(),
@ -818,7 +818,7 @@ public class ClientWorker implements Closeable {
RpcClient rpcClient = RpcClientFactory
.createClient("config-" + taskId + "-" + uuid, getConnectionType(), newlabels);
if (rpcClient.isWaitInitiated()) {
initHandlerRpcClient(rpcClient);
initRpcClientHandler(rpcClient);
rpcClient.start();
}

View File

@ -86,7 +86,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.init(serverListFactory);
rpcClient.start();
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(serviceInfoHolder));
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
}

View File

@ -25,15 +25,15 @@ import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
/**
* Naming push response handler.
* Naming push request handler.
*
* @author xiweng.yy
*/
public class NamingPushResponseHandler implements ServerRequestHandler {
public class NamingPushRequestHandler implements ServerRequestHandler {
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushResponseHandler(ServiceInfoHolder serviceInfoHolder) {
public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}

View File

@ -83,7 +83,7 @@ public abstract class RpcClient implements Closeable {
private static final long DEFAULT_TIMEOUT_MILLS = 3000L;
/**
* listener called where connect status changed.
* listener called where connection's status changed.
*/
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<ConnectionEventListener>();
@ -283,7 +283,7 @@ public abstract class RpcClient implements Closeable {
switchServerAsync();
}
registerServerPushResponseHandler(new ConnectResetRequestHandler());
registerServerRequestHandler(new ConnectResetRequestHandler());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@ -700,7 +700,7 @@ public abstract class RpcClient implements Closeable {
}
/**
* register connection handler.will be notified when inner connect changed.
* Register connection handler. Will be notified when inner connection's state changed.
*
* @param connectionEventListener connectionEventListener
*/
@ -712,11 +712,11 @@ public abstract class RpcClient implements Closeable {
}
/**
* register change listeners ,will be called when server send change notify response th current client.
* Register serverRequestHandler, the handler will handle the request from server side.
*
* @param serverRequestHandler serverRequestHandler
*/
public synchronized void registerServerPushResponseHandler(ServerRequestHandler serverRequestHandler) {
public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) {
LoggerUtils.printIfInfoEnabled(LOGGER, "Register server push request handler:{}",
serverRequestHandler.getClass().getName());

View File

@ -21,10 +21,10 @@ import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
/**
* ServerPushResponseHandler.
* ServerRequestHandler, to process the request from server side.
*
* @author liuzunfei
* @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
* @version $Id: ServerRequestHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
*/
public interface ServerRequestHandler {

View File

@ -17,10 +17,10 @@
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
* gRPC client for cluster.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
* @version $Id: GrpcClusterClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
*/
public class GrpcClusterClient extends GrpcClient {

View File

@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* grpc connection.
* gRPC connection.
*
* @author liuzunfei
* @version $Id: GrpcConnection.java, v 0.1 2020年08月09日 1:36 PM liuzunfei Exp $

View File

@ -17,10 +17,10 @@
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
* gRPC client for ops.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
* @version $Id: GrpcOpsClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $
*/
public class GrpcOpsClient extends GrpcClient {

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
/**
* sdk client for grpc.
* gRPC client for sdk.
*
* @author liuzunfei
* @version $Id: GrpcSdkClient.java, v 0.1 2020年09月07日 11:05 AM liuzunfei Exp $

View File

@ -39,7 +39,7 @@ import java.io.IOException;
import java.nio.charset.Charset;
/**
* grpc utils, use to parse request and response.
* gRPC utils, use to parse request and response.
*
* @author liuzunfei
* @version $Id: GrpcUtils.java, v 0.1 2020年08月09日 1:43 PM liuzunfei Exp $
@ -94,20 +94,20 @@ public class GrpcUtils {
*/
public static Payload convert(Request request, RequestMeta meta) {
//meta.
Payload.Builder builder = Payload.newBuilder();
Payload.Builder payloadBuilder = Payload.newBuilder();
Metadata.Builder metaBuilder = Metadata.newBuilder();
if (meta != null) {
metaBuilder.setClientIp(meta.getClientIp()).setClientPort(meta.getClientPort())
.setConnectionId(meta.getConnectionId()).putAllLabels(meta.getLabels())
.putAllHeaders(request.getHeaders()).setClientVersion(meta.getClientVersion())
.setClientVersion(meta.getClientVersion()).putAllHeaders(request.getHeaders())
.setType(request.getClass().getName());
}
builder.setMetadata(metaBuilder.build());
payloadBuilder.setMetadata(metaBuilder.build());
// request body .
request.clearHeaders();
String jsonString = toJson(request);
Payload payload = builder
Payload payload = payloadBuilder
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
.build();
return payload;
@ -123,14 +123,15 @@ public class GrpcUtils {
*/
public static Payload convert(Request request, Metadata meta) {
Metadata buildMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build();
Metadata newMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build();
request.clearHeaders();
String jsonString = toJson(request);
Payload.Builder builder = Payload.newBuilder();
Payload payload = builder
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
.setMetadata(buildMeta).build();
.setMetadata(newMeta).build();
return payload;
}
@ -161,15 +162,15 @@ public class GrpcUtils {
*/
public static PlainRequest parse(Payload payload) {
PlainRequest plainRequest = new PlainRequest();
Class classyType = PayloadRegistry.getClassByType(payload.getMetadata().getType());
if (classyType != null) {
Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classyType);
Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());
if (classType != null) {
Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classType);
if (obj instanceof Request) {
((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());
}
plainRequest.body = obj;
} else {
throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + classyType);
throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + payload.getMetadata().getType());
}
plainRequest.type = payload.getMetadata().getType();

View File

@ -112,35 +112,35 @@ public class RpcAckCallbackSynchronizer {
/**
* clear context of connectionId.
*
* @param connetionId connetionId
* @param connectionId connectionId
*/
public static void clearContext(String connetionId) {
CALLBACK_CONTEXT.remove(connetionId);
public static void clearContext(String connectionId) {
CALLBACK_CONTEXT.remove(connectionId);
}
/**
* clear context of connectionId.
*
* @param connetionId connetionId
* @param connectionId connectionId
*/
public static Map<String, DefaultRequestFuture> initContextIfNecessary(String connetionId) {
if (!CALLBACK_CONTEXT.containsKey(connetionId)) {
public static Map<String, DefaultRequestFuture> initContextIfNecessary(String connectionId) {
if (!CALLBACK_CONTEXT.containsKey(connectionId)) {
Map<String, DefaultRequestFuture> context = new HashMap<String, DefaultRequestFuture>(128);
Map<String, DefaultRequestFuture> stringDefaultRequestFutureMap = CALLBACK_CONTEXT
.putIfAbsent(connetionId, context);
.putIfAbsent(connectionId, context);
return stringDefaultRequestFutureMap == null ? context : stringDefaultRequestFutureMap;
} else {
return CALLBACK_CONTEXT.get(connetionId);
return CALLBACK_CONTEXT.get(connectionId);
}
}
/**
* clear context of connectionId.
*
* @param connetionId connetionId
* @param connectionId connectionId
*/
public static void clearFuture(String connetionId, String requestId) {
Map<String, DefaultRequestFuture> stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connetionId);
public static void clearFuture(String connectionId, String requestId) {
Map<String, DefaultRequestFuture> stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connectionId);
if (stringDefaultPushFutureMap == null || !stringDefaultPushFutureMap.containsKey(requestId)) {
return;

View File

@ -174,17 +174,17 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls
.asyncBidiStreamingCall((responseObserver) -> {
return grpcBiStreamRequestAcceptor.requestBiStream(responseObserver);
});
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));